http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java new file mode 100644 index 0000000..f5a18ac --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.util.Iterator; + +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.ProtoUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestRaftLogCache { + private RaftLogCache cache; + + @Before + public void setup() { + cache = new RaftLogCache(); + } + + private LogSegment prepareLogSegment(long start, long end, boolean isOpen) { + LogSegment s = LogSegment.newOpenSegment(start); + for (long i = start; i <= end; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + 0, i); + s.appendToOpenSegment(entry); + } + if (!isOpen) { + s.close(); + } + return s; + } + + private void checkCache(long start, long end, int segmentSize) { + Assert.assertEquals(start, cache.getStartIndex()); + Assert.assertEquals(end, cache.getEndIndex()); + + for (long index = start; index <= end; index++) { + LogEntryProto entry = cache.getEntry(index); + Assert.assertEquals(index, entry.getIndex()); + } + + long[] offsets = new long[]{start, start + 1, start + (end - start) / 2, + end - 1, end}; + for (long offset : offsets) { + checkCacheEntries(offset, (int) (end - offset + 1), end); + checkCacheEntries(offset, 1, end); + checkCacheEntries(offset, 20, end); + checkCacheEntries(offset, segmentSize, end); + checkCacheEntries(offset, segmentSize - 1, end); + } + } + + private void checkCacheEntries(long offset, int size, long end) { + LogEntryProto[] entries = cache.getEntries(offset, offset + size); + long realEnd = offset + size > end + 1 ? end + 1 : offset + size; + Assert.assertEquals(realEnd - offset, entries.length); + for (long i = offset; i < realEnd; i++) { + Assert.assertEquals(i, entries[(int) (i - offset)].getIndex()); + } + } + + @Test + public void testAddSegments() throws Exception { + LogSegment s1 = prepareLogSegment(1, 100, false); + cache.addSegment(s1); + checkCache(1, 100, 100); + + try { + LogSegment s = prepareLogSegment(102, 103, true); + cache.addSegment(s); + Assert.fail("should fail since there is gap between two segments"); + } catch (IllegalStateException ignored) { + } + + LogSegment s2 = prepareLogSegment(101, 200, true); + cache.addSegment(s2); + checkCache(1, 200, 100); + + try { + LogSegment s = prepareLogSegment(201, 202, true); + cache.addSegment(s); + Assert.fail("should fail since there is still an open segment in cache"); + } catch (IllegalStateException ignored) { + } + + cache.rollOpenSegment(false); + checkCache(1, 200, 100); + + try { + LogSegment s = prepareLogSegment(202, 203, true); + cache.addSegment(s); + Assert.fail("should fail since there is gap between two segments"); + } catch (IllegalStateException ignored) { + } + + LogSegment s3 = prepareLogSegment(201, 300, true); + cache.addSegment(s3); + Assert.assertNotNull(cache.getOpenSegment()); + checkCache(1, 300, 100); + + cache.rollOpenSegment(true); + Assert.assertNotNull(cache.getOpenSegment()); + checkCache(1, 300, 100); + } + + @Test + public void testAppendEntry() throws Exception { + LogSegment closedSegment = prepareLogSegment(0, 99, false); + cache.addSegment(closedSegment); + + final SimpleOperation m = new SimpleOperation("m"); + try { + LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + 0, 0); + cache.appendEntry(entry); + Assert.fail("the open segment is null"); + } catch (IllegalStateException ignored) { + } + + LogSegment openSegment = prepareLogSegment(100, 100, true); + cache.addSegment(openSegment); + for (long index = 101; index < 200; index++) { + LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + 0, index); + cache.appendEntry(entry); + } + + Assert.assertNotNull(cache.getOpenSegment()); + checkCache(0, 199, 100); + } + + @Test + public void testTruncate() throws Exception { + long start = 0; + for (int i = 0; i < 5; i++) { // 5 closed segments + LogSegment s = prepareLogSegment(start, start + 99, false); + cache.addSegment(s); + start += 100; + } + // add another open segment + LogSegment s = prepareLogSegment(start, start + 99, true); + cache.addSegment(s); + + long end = cache.getEndIndex(); + Assert.assertEquals(599, end); + int numOfSegments = 6; + // start truncation + for (int i = 0; i < 10; i++) { // truncate 10 times + // each time truncate 37 entries + end -= 37; + TruncationSegments ts = cache.truncate(end + 1); + checkCache(0, end, 100); + + // check TruncationSegments + int currentNum= (int) (end / 100 + 1); + if (currentNum < numOfSegments) { + Assert.assertEquals(1, ts.toDelete.length); + numOfSegments = currentNum; + } else { + Assert.assertEquals(0, ts.toDelete.length); + } + } + + // 230 entries remaining. truncate at the segment boundary + TruncationSegments ts = cache.truncate(200); + checkCache(0, 199, 100); + Assert.assertEquals(1, ts.toDelete.length); + Assert.assertEquals(200, ts.toDelete[0].startIndex); + Assert.assertEquals(229, ts.toDelete[0].endIndex); + Assert.assertEquals(0, ts.toDelete[0].targetLength); + Assert.assertFalse(ts.toDelete[0].isOpen); + Assert.assertNull(ts.toTruncate); + + // add another open segment and truncate it as a whole + LogSegment newOpen = prepareLogSegment(200, 249, true); + cache.addSegment(newOpen); + ts = cache.truncate(200); + checkCache(0, 199, 100); + Assert.assertEquals(1, ts.toDelete.length); + Assert.assertEquals(200, ts.toDelete[0].startIndex); + Assert.assertEquals(249, ts.toDelete[0].endIndex); + Assert.assertEquals(0, ts.toDelete[0].targetLength); + Assert.assertTrue(ts.toDelete[0].isOpen); + Assert.assertNull(ts.toTruncate); + + // add another open segment and truncate part of it + newOpen = prepareLogSegment(200, 249, true); + cache.addSegment(newOpen); + ts = cache.truncate(220); + checkCache(0, 219, 100); + Assert.assertNull(cache.getOpenSegment()); + Assert.assertEquals(0, ts.toDelete.length); + Assert.assertTrue(ts.toTruncate.isOpen); + Assert.assertEquals(219, ts.toTruncate.newEndIndex); + Assert.assertEquals(200, ts.toTruncate.startIndex); + Assert.assertEquals(249, ts.toTruncate.endIndex); + } + + private void testIterator(long startIndex) { + Iterator<LogEntryProto> iterator = cache.iterator(startIndex); + LogEntryProto prev = null; + while (iterator.hasNext()) { + LogEntryProto entry = iterator.next(); + Assert.assertEquals(cache.getEntry(entry.getIndex()), entry); + if (prev != null) { + Assert.assertEquals(prev.getIndex() + 1, entry.getIndex()); + } + prev = entry; + } + if (startIndex <= cache.getEndIndex()) { + Assert.assertNotNull(prev); + Assert.assertEquals(cache.getEndIndex(), prev.getIndex()); + } + } + + @Test + public void testIterator() throws Exception { + long start = 0; + for (int i = 0; i < 2; i++) { // 2 closed segments + LogSegment s = prepareLogSegment(start, start + 99, false); + cache.addSegment(s); + start += 100; + } + // add another open segment + LogSegment s = prepareLogSegment(start, start + 99, true); + cache.addSegment(s); + + for (long startIndex = 0; startIndex < 300; startIndex += 50) { + testIterator(startIndex); + } + testIterator(299); + + Iterator<LogEntryProto> iterator = cache.iterator(300); + Assert.assertFalse(iterator.hasNext()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java new file mode 100644 index 0000000..bcdb958 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ChecksumException; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; +import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.ProtoUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test basic functionality of LogReader, LogInputStream, and LogOutputStream. + */ +public class TestRaftLogReadWrite { + private static final Logger LOG = LoggerFactory.getLogger(TestRaftLogReadWrite.class); + + private File storageDir; + private RaftProperties properties; + private int segmentMaxSize; + + @Before + public void setup() throws Exception { + storageDir = RaftTestUtil.getTestDir(TestRaftLogReadWrite.class); + properties = new RaftProperties(); + properties.set(RAFT_SERVER_STORAGE_DIR_KEY, + FileUtils.fileAsURI(storageDir).toString()); + } + + @After + public void tearDown() throws Exception { + if (storageDir != null) { + FileUtils.fullyDelete(storageDir.getParentFile()); + } + } + + private LogEntryProto[] readLog(File file, long startIndex, long endIndex, + boolean isOpen) throws IOException { + List<LogEntryProto> list = new ArrayList<>(); + try (LogInputStream in = + new LogInputStream(file, startIndex, endIndex, isOpen)) { + LogEntryProto entry; + while ((entry = in.nextEntry()) != null) { + list.add(entry); + } + } + return list.toArray(new LogEntryProto[list.size()]); + } + + private long writeMessages(LogEntryProto[] entries, LogOutputStream out) + throws IOException { + long size = 0; + for (int i = 0; i < entries.length; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + final int s = entries[i].getSerializedSize(); + size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4; + out.write(entries[i]); + } + return size; + } + + /** + * Test basic functionality: write several log entries, then read + */ + @Test + public void testReadWriteLog() throws IOException { + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + long size = SegmentedRaftLog.HEADER_BYTES.length; + + final LogEntryProto[] entries = new LogEntryProto[100]; + try (LogOutputStream out = + new LogOutputStream(openSegment, false, properties)) { + size += writeMessages(entries, out); + } finally { + storage.close(); + } + + Assert.assertEquals(size, openSegment.length()); + + LogEntryProto[] readEntries = readLog(openSegment, 0, + RaftServerConstants.INVALID_LOG_INDEX, true); + Assert.assertArrayEquals(entries, readEntries); + } + + @Test + public void testAppendLog() throws IOException { + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + LogEntryProto[] entries = new LogEntryProto[200]; + try (LogOutputStream out = + new LogOutputStream(openSegment, false, properties)) { + for (int i = 0; i < 100; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + out.write(entries[i]); + } + } + + try (LogOutputStream out = + new LogOutputStream(openSegment, true, properties)) { + for (int i = 100; i < 200; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + out.write(entries[i]); + } + } + + LogEntryProto[] readEntries = readLog(openSegment, 0, + RaftServerConstants.INVALID_LOG_INDEX, true); + Assert.assertArrayEquals(entries, readEntries); + + storage.close(); + } + + /** + * Simulate the scenario that the peer is shutdown without truncating + * log segment file padding. Make sure the reader can correctly handle this. + */ + @Test + public void testReadWithPadding() throws IOException { + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + long size = SegmentedRaftLog.HEADER_BYTES.length; + + LogEntryProto[] entries = new LogEntryProto[100]; + LogOutputStream out = new LogOutputStream(openSegment, false, properties); + size += writeMessages(entries, out); + out.flush(); + + // make sure the file contains padding + Assert.assertEquals(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT, + openSegment.length()); + + // check if the reader can correctly read the log file + LogEntryProto[] readEntries = readLog(openSegment, 0, + RaftServerConstants.INVALID_LOG_INDEX, true); + Assert.assertArrayEquals(entries, readEntries); + + out.close(); + Assert.assertEquals(size, openSegment.length()); + } + + /** + * corrupt the padding by inserting non-zero bytes. Make sure the reader + * throws exception. + */ + @Test + public void testReadWithCorruptPadding() throws IOException { + properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 4 * 1024 * 1024); + properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 16 * 1024 * 1024); + + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + + LogEntryProto[] entries = new LogEntryProto[10]; + LogOutputStream out = new LogOutputStream(openSegment, false, properties); + for (int i = 0; i < 10; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + out.write(entries[i]); + } + out.flush(); + + // make sure the file contains padding + Assert.assertEquals(4 * 1024 * 1024, openSegment.length()); + + try (FileOutputStream fout = new FileOutputStream(openSegment, true)) { + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{-1, 1}); + fout.getChannel() + .write(byteBuffer, 16 * 1024 * 1024 - 10); + } + + List<LogEntryProto> list = new ArrayList<>(); + try (LogInputStream in = new LogInputStream(openSegment, 0, + RaftServerConstants.INVALID_LOG_INDEX, true)) { + LogEntryProto entry; + while ((entry = in.nextEntry()) != null) { + list.add(entry); + } + Assert.fail("should fail since we corrupt the padding"); + } catch (IOException e) { + boolean findVerifyTerminator = false; + for (StackTraceElement s : e.getStackTrace()) { + if (s.getMethodName().equals("verifyTerminator")) { + findVerifyTerminator = true; + break; + } + } + Assert.assertTrue(findVerifyTerminator); + } + Assert.assertArrayEquals(entries, + list.toArray(new LogEntryProto[list.size()])); + } + + /** + * Test the log reader to make sure it can detect the checksum mismatch. + */ + @Test + public void testReadWithEntryCorruption() throws IOException { + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + try (LogOutputStream out = + new LogOutputStream(openSegment, false, properties)) { + for (int i = 0; i < 100; i++) { + LogEntryProto entry = ProtoUtils.toLogEntryProto( + new SimpleOperation("m" + i).getLogEntryContent(), 0, i); + out.write(entry); + } + } finally { + storage.close(); + } + + // corrupt the log file + try (RandomAccessFile raf = new RandomAccessFile(openSegment.getCanonicalFile(), + "rw")) { + raf.seek(100); + int correctValue = raf.read(); + raf.seek(100); + raf.write(correctValue + 1); + } + + try { + readLog(openSegment, 0, RaftServerConstants.INVALID_LOG_INDEX, true); + Assert.fail("The read of corrupted log file should fail"); + } catch (ChecksumException e) { + LOG.info("Caught ChecksumException as expected", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java new file mode 100644 index 0000000..3092a21 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_WRITE_BUFFER_SIZE_KEY; +import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; +import static org.apache.ratis.server.storage.LogSegment.getEntrySize; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.ProtoUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test basic functionality of {@link LogSegment} + */ +public class TestRaftLogSegment { + private File storageDir; + private final RaftProperties properties = new RaftProperties(); + + @Before + public void setup() throws Exception { + storageDir = RaftTestUtil.getTestDir(TestRaftLogSegment.class); + properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, + storageDir.getCanonicalPath()); + } + + @After + public void tearDown() throws Exception { + if (storageDir != null) { + FileUtils.fullyDelete(storageDir.getParentFile()); + } + } + + private File prepareLog(boolean isOpen, long start, int size, long term) + throws IOException { + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + File file = isOpen ? storage.getStorageDir().getOpenLogFile(start) : + storage.getStorageDir().getClosedLogFile(start, start + size - 1); + + LogEntryProto[] entries = new LogEntryProto[size]; + try (LogOutputStream out = new LogOutputStream(file, false, properties)) { + for (int i = 0; i < size; i++) { + SimpleOperation op = new SimpleOperation("m" + i); + entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), + term, i + start); + out.write(entries[i]); + } + } + storage.close(); + return file; + } + + private void checkLogSegment(LogSegment segment, long start, long end, + boolean isOpen, long totalSize, long term) { + Assert.assertEquals(start, segment.getStartIndex()); + Assert.assertEquals(end, segment.getEndIndex()); + Assert.assertEquals(isOpen, segment.isOpen()); + Assert.assertEquals(totalSize, segment.getTotalSize()); + + long offset = SegmentedRaftLog.HEADER_BYTES.length; + for (long i = start; i <= end; i++) { + LogSegment.LogRecord record = segment.getLogRecord(i); + Assert.assertEquals(i, record.entry.getIndex()); + Assert.assertEquals(term, record.entry.getTerm()); + Assert.assertEquals(offset, record.offset); + + offset += getEntrySize(record.entry); + } + } + + @Test + public void testLoadLogSegment() throws Exception { + // load an open segment + File openSegmentFile = prepareLog(true, 0, 100, 0); + LogSegment openSegment = LogSegment.loadSegment(openSegmentFile, 0, + INVALID_LOG_INDEX, true, null); + checkLogSegment(openSegment, 0, 99, true, openSegmentFile.length(), 0); + + // load a closed segment (1000-1099) + File closedSegmentFile = prepareLog(false, 1000, 100, 1); + LogSegment closedSegment = LogSegment.loadSegment(closedSegmentFile, 1000, + 1099, false, null); + checkLogSegment(closedSegment, 1000, 1099, false, + closedSegment.getTotalSize(), 1); + } + + @Test + public void testAppendEntries() throws Exception { + final long start = 1000; + LogSegment segment = LogSegment.newOpenSegment(start); + long size = SegmentedRaftLog.HEADER_BYTES.length; + final long max = 8 * 1024 * 1024; + checkLogSegment(segment, start, start - 1, true, size, 0); + + // append till full + long term = 0; + int i = 0; + List<LogEntryProto> list = new ArrayList<>(); + while (size < max) { + SimpleOperation op = new SimpleOperation("m" + i); + LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), + term, i++ + start); + size += getEntrySize(entry); + list.add(entry); + } + + segment.appendToOpenSegment(list.toArray(new LogEntryProto[list.size()])); + Assert.assertTrue(segment.getTotalSize() >= max); + checkLogSegment(segment, start, i - 1 + start, true, size, term); + } + + @Test + public void testAppendWithGap() throws Exception { + LogSegment segment = LogSegment.newOpenSegment(1000); + SimpleOperation op = new SimpleOperation("m"); + final SMLogEntryProto m = op.getLogEntryContent(); + try { + LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001); + segment.appendToOpenSegment(entry); + Assert.fail("should fail since the entry's index needs to be 1000"); + } catch (Exception e) { + Assert.assertTrue(e instanceof IllegalArgumentException); + } + + LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000); + segment.appendToOpenSegment(entry); + + try { + entry = ProtoUtils.toLogEntryProto(m, 0, 1002); + segment.appendToOpenSegment(entry); + Assert.fail("should fail since the entry's index needs to be 1001"); + } catch (Exception e) { + Assert.assertTrue(e instanceof IllegalArgumentException); + } + + LogEntryProto[] entries = new LogEntryProto[2]; + for (int i = 0; i < 2; i++) { + entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2); + } + try { + segment.appendToOpenSegment(entries); + Assert.fail("should fail since there is gap between entries"); + } catch (Exception e) { + Assert.assertTrue(e instanceof IllegalArgumentException); + } + } + + @Test + public void testTruncate() throws Exception { + final long term = 1; + final long start = 1000; + LogSegment segment = LogSegment.newOpenSegment(start); + for (int i = 0; i < 100; i++) { + LogEntryProto entry = ProtoUtils.toLogEntryProto( + new SimpleOperation("m" + i).getLogEntryContent(), term, i + start); + segment.appendToOpenSegment(entry); + } + + // truncate an open segment (remove 1080~1099) + long newSize = segment.getLogRecord(start + 80).offset; + segment.truncate(start + 80); + Assert.assertEquals(80, segment.numOfEntries()); + checkLogSegment(segment, start, start + 79, false, newSize, term); + + // truncate a closed segment (remove 1050~1079) + newSize = segment.getLogRecord(start + 50).offset; + segment.truncate(start + 50); + Assert.assertEquals(50, segment.numOfEntries()); + checkLogSegment(segment, start, start + 49, false, newSize, term); + + // truncate all the remaining entries + segment.truncate(start); + Assert.assertEquals(0, segment.numOfEntries()); + checkLogSegment(segment, start, start - 1, false, + SegmentedRaftLog.HEADER_BYTES.length, term); + } + + private RaftProperties getProperties(long maxSegmentSize, + long preallocatedSize) { + RaftProperties p = new RaftProperties(); + p.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, + maxSegmentSize); + p.setLong(RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, + preallocatedSize); + return p; + } + + @Test + public void testPreallocateSegment() throws Exception { + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + final File file = storage.getStorageDir().getOpenLogFile(0); + final int[] maxSizes = new int[]{1024, 1025, 1024 * 1024 - 1, 1024 * 1024, + 1024 * 1024 + 1, 2 * 1024 * 1024 - 1, 2 * 1024 * 1024, + 2 * 1024 * 1024 + 1, 8 * 1024 * 1024}; + final int[] preallocated = new int[]{512, 1024, 1025, 1024 * 1024, + 1024 * 1024 + 1, 2 * 1024 * 1024}; + + // make sure preallocation is correct with different max/pre-allocated size + for (int max : maxSizes) { + for (int a : preallocated) { + try (LogOutputStream ignored = + new LogOutputStream(file, false, getProperties(max, a))) { + Assert.assertEquals(file.length(), Math.min(max, a)); + } + try (LogInputStream in = + new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) { + LogEntryProto entry = in.nextEntry(); + Assert.assertNull(entry); + } + } + } + + // test the scenario where an entry's size is larger than the max size + final byte[] content = new byte[1024 * 2]; + Arrays.fill(content, (byte) 1); + final long size; + try (LogOutputStream out = new LogOutputStream(file, false, + getProperties(1024, 1024))) { + SimpleOperation op = new SimpleOperation(new String(content)); + LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), + 0, 0); + size = LogSegment.getEntrySize(entry); + out.write(entry); + } + Assert.assertEquals(file.length(), + size + SegmentedRaftLog.HEADER_BYTES.length); + try (LogInputStream in = new LogInputStream(file, 0, + INVALID_LOG_INDEX, true)) { + LogEntryProto entry = in.nextEntry(); + Assert.assertArrayEquals(content, + entry.getSmLogEntry().getData().toByteArray()); + Assert.assertNull(in.nextEntry()); + } + } + + /** + * Keep appending and check if pre-allocation is correct + */ + @Test + public void testPreallocationAndAppend() throws Exception { + final long max = 2 * 1024 * 1024; + properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, max); + properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024); + properties.setLong(RAFT_LOG_WRITE_BUFFER_SIZE_KEY, 10 * 1024); + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + final File file = storage.getStorageDir().getOpenLogFile(0); + + final byte[] content = new byte[1024]; + Arrays.fill(content, (byte) 1); + SimpleOperation op = new SimpleOperation(new String(content)); + LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), + 0, 0); + final long entrySize = LogSegment.getEntrySize(entry); + + long totalSize = SegmentedRaftLog.HEADER_BYTES.length; + long preallocated = 16 * 1024; + try (LogOutputStream out = new LogOutputStream(file, false, properties)) { + Assert.assertEquals(preallocated, file.length()); + while (totalSize + entrySize < max) { + totalSize += entrySize; + out.write(entry); + if (totalSize > preallocated) { + Assert.assertEquals("totalSize==" + totalSize, + preallocated + 16 * 1024, file.length()); + preallocated += 16 * 1024; + } + } + } + + Assert.assertEquals(totalSize, file.length()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java new file mode 100644 index 0000000..a51e933 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.io.nativeio.NativeIO; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.MetaFile; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.server.storage.RaftStorageDirectory; +import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState; +import org.apache.ratis.statemachine.SimpleStateMachineStorage; +import org.apache.ratis.util.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Test RaftStorage and RaftStorageDirectory + */ +public class TestRaftStorage { + private File storageDir; + private final RaftProperties properties = new RaftProperties(); + + @Before + public void setup() throws Exception { + storageDir = RaftTestUtil.getTestDir(TestRaftStorage.class); + properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, + storageDir.getCanonicalPath()); + } + + @After + public void tearDown() throws Exception { + if (storageDir != null) { + FileUtils.fullyDelete(storageDir.getParentFile()); + } + } + + @Test + public void testNotExistent() throws IOException { + FileUtils.fullyDelete(storageDir); + + // we will format the empty directory + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + + try { + new RaftStorage(properties, StartupOption.FORMAT).close(); + Assert.fail("the format should fail since the storage is still locked"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("directory is already locked")); + } + + storage.close(); + FileUtils.fullyDelete(storageDir); + Assert.assertTrue(storageDir.createNewFile()); + try { + new RaftStorage(properties, StartupOption.REGULAR); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue( + e.getMessage().contains(StorageState.NON_EXISTENT.name())); + } + } + + /** + * make sure the RaftStorage format works + */ + @Test + public void testStorage() throws Exception { + RaftStorageDirectory sd = new RaftStorageDirectory(storageDir); + try { + StorageState state = sd.analyzeStorage(true); + Assert.assertEquals(StorageState.NOT_FORMATTED, state); + Assert.assertTrue(sd.isCurrentEmpty()); + } finally { + sd.unlock(); + } + + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + storage.close(); + + Assert.assertEquals(StorageState.NORMAL, sd.analyzeStorage(false)); + File m = sd.getMetaFile(); + Assert.assertTrue(m.exists()); + MetaFile metaFile = new MetaFile(m); + Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); + Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); + + metaFile.set(123, "peer1"); + metaFile.readFile(); + Assert.assertEquals(123, metaFile.getTerm()); + Assert.assertEquals("peer1", metaFile.getVotedFor()); + + MetaFile metaFile2 = new MetaFile(m); + Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded")); + Assert.assertEquals(123, metaFile.getTerm()); + Assert.assertEquals("peer1", metaFile.getVotedFor()); + + // test format + storage = new RaftStorage(properties, StartupOption.FORMAT); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + metaFile = new MetaFile(sd.getMetaFile()); + Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); + Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); + storage.close(); + } + + @Test + public void testMetaFile() throws Exception { + RaftStorage storage = new RaftStorage(properties, StartupOption.FORMAT); + File m = storage.getStorageDir().getMetaFile(); + Assert.assertTrue(m.exists()); + MetaFile metaFile = new MetaFile(m); + Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); + Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); + + metaFile.set(123, "peer1"); + metaFile.readFile(); + Assert.assertEquals(123, metaFile.getTerm()); + Assert.assertEquals("peer1", metaFile.getVotedFor()); + + MetaFile metaFile2 = new MetaFile(m); + Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded")); + Assert.assertEquals(123, metaFile.getTerm()); + Assert.assertEquals("peer1", metaFile.getVotedFor()); + + storage.close(); + } + + /** + * check if RaftStorage deletes tmp metafile when startup + */ + @Test + public void testCleanMetaTmpFile() throws Exception { + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + storage.close(); + + RaftStorageDirectory sd = new RaftStorageDirectory(storageDir); + File metaFile = sd.getMetaFile(); + NativeIO.renameTo(metaFile, sd.getMetaTmpFile()); + + Assert.assertEquals(StorageState.NOT_FORMATTED, sd.analyzeStorage(false)); + + try { + new RaftStorage(properties, StartupOption.REGULAR); + Assert.fail("should throw IOException since storage dir is not formatted"); + } catch (IOException e) { + Assert.assertTrue( + e.getMessage().contains(StorageState.NOT_FORMATTED.name())); + } + + // let the storage dir contain both raft-meta and raft-meta.tmp + new RaftStorage(properties, StartupOption.FORMAT).close(); + Assert.assertTrue(sd.getMetaFile().exists()); + Assert.assertTrue(sd.getMetaTmpFile().createNewFile()); + Assert.assertTrue(sd.getMetaTmpFile().exists()); + try { + storage = new RaftStorage(properties, StartupOption.REGULAR); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + Assert.assertFalse(sd.getMetaTmpFile().exists()); + Assert.assertTrue(sd.getMetaFile().exists()); + } finally { + storage.close(); + } + } + + @Test + public void testSnapshotFileName() throws Exception { + final long term = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + final long index = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + final String name = SimpleStateMachineStorage.getSnapshotFileName(term, index); + System.out.println("name = " + name); + final File file = new File(storageDir, name); + final TermIndex ti = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(file); + System.out.println("file = " + file); + Assert.assertEquals(term, ti.getTerm()); + Assert.assertEquals(index, ti.getIndex()); + System.out.println("ti = " + ti); + + final File foo = new File(storageDir, "foo"); + try { + SimpleStateMachineStorage.getTermIndexFromSnapshotFile(foo); + Assert.fail(); + } catch(IllegalArgumentException iae) { + System.out.println("Good " + iae); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java new file mode 100644 index 0000000..405a1a5 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.ConfigurationManager; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.RaftUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestSegmentedRaftLog { + static { + RaftUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG); + } + + private static final String peerId = "s0"; + + private static class SegmentRange { + final long start; + final long end; + final long term; + final boolean isOpen; + + SegmentRange(long s, long e, long term, boolean isOpen) { + this.start = s; + this.end = e; + this.term = term; + this.isOpen = isOpen; + } + } + + private File storageDir; + private RaftProperties properties; + private RaftStorage storage; + private final ConfigurationManager cm = new ConfigurationManager( + MiniRaftCluster.initConfiguration(MiniRaftCluster.generateIds(3, 0))); + + @Before + public void setup() throws Exception { + storageDir = RaftTestUtil.getTestDir(TestSegmentedRaftLog.class); + properties = new RaftProperties(); + properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, + storageDir.getCanonicalPath()); + storage = new RaftStorage(properties, RaftServerConstants.StartupOption.REGULAR); + } + + @After + public void tearDown() throws Exception { + if (storageDir != null) { + FileUtils.fullyDelete(storageDir.getParentFile()); + } + } + + private LogEntryProto[] prepareLog(List<SegmentRange> list) throws IOException { + List<LogEntryProto> entryList = new ArrayList<>(); + for (SegmentRange range : list) { + File file = range.isOpen ? + storage.getStorageDir().getOpenLogFile(range.start) : + storage.getStorageDir().getClosedLogFile(range.start, range.end); + + final int size = (int) (range.end - range.start + 1); + LogEntryProto[] entries = new LogEntryProto[size]; + try (LogOutputStream out = new LogOutputStream(file, false, properties)) { + for (int i = 0; i < size; i++) { + SimpleOperation m = new SimpleOperation("m" + (i + range.start)); + entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + range.term, i + range.start); + out.write(entries[i]); + } + } + Collections.addAll(entryList, entries); + } + return entryList.toArray(new LogEntryProto[entryList.size()]); + } + + private List<SegmentRange> prepareRanges(int number, int segmentSize, + long startIndex) { + List<SegmentRange> list = new ArrayList<>(number); + for (int i = 0; i < number; i++) { + list.add(new SegmentRange(startIndex, startIndex + segmentSize - 1, i, + i == number - 1)); + startIndex += segmentSize; + } + return list; + } + + @Test + public void testLoadLogSegments() throws Exception { + // first generate log files + List<SegmentRange> ranges = prepareRanges(5, 100, 0); + LogEntryProto[] entries = prepareLog(ranges); + + // create RaftLog object and load log file + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + // check if log entries are loaded correctly + for (LogEntryProto e : entries) { + LogEntryProto entry = raftLog.get(e.getIndex()); + Assert.assertEquals(e, entry); + } + + Assert.assertArrayEquals(entries, raftLog.getEntries(0, 500)); + Assert.assertEquals(entries[entries.length - 1], raftLog.getLastEntry()); + } + } + + List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist, + Supplier<String> stringSupplier) { + List<LogEntryProto> eList = new ArrayList<>(); + for (SegmentRange range : slist) { + for (long index = range.start; index <= range.end; index++) { + SimpleOperation m = stringSupplier == null ? + new SimpleOperation("m" + index) : + new SimpleOperation(stringSupplier.get()); + eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(), + range.term, index)); + } + } + return eList; + } + + /** + * Append entry one by one and check if log state is correct. + */ + @Test + public void testAppendEntry() throws Exception { + List<SegmentRange> ranges = prepareRanges(5, 200, 0); + List<LogEntryProto> entries = prepareLogEntries(ranges, null); + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + // append entries to the raftlog + entries.forEach(raftLog::appendEntry); + raftLog.logSync(); + } + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + // check if the raft log is correct + checkEntries(raftLog, entries, 0, entries.size()); + } + } + + /** + * Keep appending entries, make sure the rolling is correct. + */ + @Test + public void testAppendAndRoll() throws Exception { + properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024); + properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 128 * 1024); + + List<SegmentRange> ranges = prepareRanges(1, 1024, 0); + final byte[] content = new byte[1024]; + List<LogEntryProto> entries = prepareLogEntries(ranges, + () -> new String(content)); + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + // append entries to the raftlog + entries.forEach(raftLog::appendEntry); + raftLog.logSync(); + } + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + // check if the raft log is correct + checkEntries(raftLog, entries, 0, entries.size()); + Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments()); + } + } + + @Test + public void testTruncate() throws Exception { + // prepare the log for truncation + List<SegmentRange> ranges = prepareRanges(5, 200, 0); + List<LogEntryProto> entries = prepareLogEntries(ranges, null); + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + // append entries to the raftlog + entries.forEach(raftLog::appendEntry); + raftLog.logSync(); + } + + for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) { + testTruncate(entries, fromIndex); + } + } + + private void testTruncate(List<LogEntryProto> entries, long fromIndex) + throws Exception { + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + // truncate the log + raftLog.truncate(fromIndex); + raftLog.logSync(); + + checkEntries(raftLog, entries, 0, (int) fromIndex); + } + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + // check if the raft log is correct + if (fromIndex > 0) { + Assert.assertEquals(entries.get((int) (fromIndex - 1)), + raftLog.getLastEntry()); + } else { + Assert.assertNull(raftLog.getLastEntry()); + } + checkEntries(raftLog, entries, 0, (int) fromIndex); + } + } + + private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected, + int offset, int size) { + if (size > 0) { + for (int i = offset; i < size + offset; i++) { + LogEntryProto entry = raftLog.get(expected.get(i).getIndex()); + Assert.assertEquals(expected.get(i), entry); + } + LogEntryProto[] entriesFromLog = raftLog.getEntries( + expected.get(offset).getIndex(), + expected.get(offset + size - 1).getIndex() + 1); + LogEntryProto[] expectedArray = expected.subList(offset, offset + size) + .toArray(SegmentedRaftLog.EMPTY_LOGENTRY_ARRAY); + Assert.assertArrayEquals(expectedArray, entriesFromLog); + } + } + + /** + * Test append with inconsistent entries + */ + @Test + public void testAppendEntriesWithInconsistency() throws Exception { + // prepare the log for truncation + List<SegmentRange> ranges = prepareRanges(5, 200, 0); + List<LogEntryProto> entries = prepareLogEntries(ranges, null); + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + // append entries to the raftlog + entries.forEach(raftLog::appendEntry); + raftLog.logSync(); + } + + // append entries whose first 100 entries are the same with existing log, + // and the next 100 are with different term + SegmentRange r1 = new SegmentRange(550, 599, 2, false); + SegmentRange r2 = new SegmentRange(600, 649, 3, false); + SegmentRange r3 = new SegmentRange(650, 749, 10, false); + List<LogEntryProto> newEntries = prepareLogEntries( + Arrays.asList(r1, r2, r3), null); + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])); + raftLog.logSync(); + + checkEntries(raftLog, entries, 0, 650); + checkEntries(raftLog, newEntries, 100, 100); + Assert.assertEquals(newEntries.get(newEntries.size() - 1), + raftLog.getLastEntry()); + Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), + raftLog.getLatestFlushedIndex()); + } + + // load the raftlog again and check + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); + checkEntries(raftLog, entries, 0, 650); + checkEntries(raftLog, newEntries, 100, 100); + Assert.assertEquals(newEntries.get(newEntries.size() - 1), + raftLog.getLastEntry()); + Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), + raftLog.getLatestFlushedIndex()); + + RaftLogCache cache = raftLog.getRaftLogCache(); + Assert.assertEquals(5, cache.getNumOfSegments()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java new file mode 100644 index 0000000..6854b42 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.simulation.RequestHandler; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.server.storage.RaftStorageDirectory; +import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.RaftUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class RaftSnapshotBaseTest { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + static final Logger LOG = LoggerFactory.getLogger(RaftSnapshotBaseTest.class); + private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10; + + static File getSnapshotFile(MiniRaftCluster cluster, int i) { + final RaftServerImpl leader = cluster.getLeader(); + final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader); + return sm.getStateMachineStorage().getSnapshotFile( + leader.getState().getCurrentTerm(), i); + } + + static void assertLeaderContent(MiniRaftCluster cluster) + throws InterruptedException { + final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster); + Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2, + leader.getState().getLog().getLastCommittedIndex()); + final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent(); + + for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { + Assert.assertEquals(i+1, entries[i].getIndex()); + Assert.assertArrayEquals( + new SimpleMessage("m" + i).getContent().toByteArray(), + entries[i].getSmLogEntry().getData().toByteArray()); + } + } + + private MiniRaftCluster cluster; + + public abstract MiniRaftCluster initCluster(int numServer, RaftProperties prop) + throws IOException; + + @Before + public void setup() throws IOException { + final RaftProperties prop = new RaftProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + prop.setLong(RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY, + SNAPSHOT_TRIGGER_THRESHOLD); + prop.setBoolean(RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, true); + this.cluster = initCluster(1, prop); + cluster.start(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Keep generating writing traffic and make sure snapshots are taken. + * We then restart the whole raft peer and check if it can correctly load + * snapshots + raft log. + */ + @Test + public void testRestartPeer() throws Exception { + RaftTestUtil.waitForLeader(cluster); + final String leaderId = cluster.getLeader().getId(); + int i = 0; + try(final RaftClient client = cluster.createClient("client", leaderId)) { + for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { + RaftClientReply reply = client.send(new SimpleMessage("m" + i)); + Assert.assertTrue(reply.isSuccess()); + } + } + + // wait for the snapshot to be done + final File snapshotFile = getSnapshotFile(cluster, i); + + int retries = 0; + do { + Thread.sleep(1000); + } while (!snapshotFile.exists() && retries++ < 10); + + Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists()); + + // restart the peer and check if it can correctly load snapshot + cluster.restart(false); + try { + // 200 messages + two leader elections --> last committed = 201 + assertLeaderContent(cluster); + } finally { + cluster.shutdown(); + } + } + + /** + * Basic test for install snapshot: start a one node cluster and let it + * generate a snapshot. Then delete the log and restart the node, and add more + * nodes as followers. + */ + @Test + public void testBasicInstallSnapshot() throws Exception { + List<LogPathAndIndex> logs = new ArrayList<>(); + try { + RaftTestUtil.waitForLeader(cluster); + final String leaderId = cluster.getLeader().getId(); + + int i = 0; + try(final RaftClient client = cluster.createClient("client", leaderId)) { + for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { + RaftClientReply reply = client.send(new SimpleMessage("m" + i)); + Assert.assertTrue(reply.isSuccess()); + } + } + + // wait for the snapshot to be done + RaftStorageDirectory storageDirectory = cluster.getLeader().getState() + .getStorage().getStorageDir(); + final File snapshotFile = getSnapshotFile(cluster, i); + logs = storageDirectory.getLogSegmentFiles(); + + int retries = 0; + do { + Thread.sleep(1000); + } while (!snapshotFile.exists() && retries++ < 10); + + Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists()); + } finally { + cluster.shutdown(); + } + + // delete the log segments from the leader + for (LogPathAndIndex path : logs) { + FileUtils.deleteFile(path.path.toFile()); + } + + // restart the peer + LOG.info("Restarting the cluster"); + cluster.restart(false); + try { + assertLeaderContent(cluster); + + // generate some more traffic + try(final RaftClient client = cluster.createClient("client", + cluster.getLeader().getId())) { + Assert.assertTrue(client.send(new SimpleMessage("test")).isSuccess()); + } + + // add two more peers + MiniRaftCluster.PeerChanges change = cluster.addNewPeers( + new String[]{"s3", "s4"}, true); + // trigger setConfiguration + SetConfigurationRequest request = new SetConfigurationRequest("client", + cluster.getLeader().getId(), DEFAULT_SEQNUM, change.allPeersInNewConf); + LOG.info("Start changing the configuration: {}", request); + cluster.getLeader().setConfiguration(request); + + RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); + } finally { + cluster.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java new file mode 100644 index 0000000..cc82371 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.io.MD5Hash; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.LogInputStream; +import org.apache.ratis.server.storage.LogOutputStream; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.MD5FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A {@link StateMachine} implementation example that simply stores all the log + * entries in a list. Mainly used for test. + * + * For snapshot it simply merges all the log segments together. + */ +public class SimpleStateMachine4Testing extends BaseStateMachine { + static volatile int SNAPSHOT_THRESHOLD = 100; + static final Logger LOG = LoggerFactory.getLogger(SimpleStateMachine4Testing.class); + public static final String RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_KEY + = "raft.test.simple.state.machine.take.snapshot"; + public static final boolean RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT = false; + + public static SimpleStateMachine4Testing get(RaftServerImpl s) { + return (SimpleStateMachine4Testing)RaftServerTestUtil.getStateMachine(s); + } + + private final List<LogEntryProto> list = + Collections.synchronizedList(new ArrayList<>()); + private final Daemon checkpointer; + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); + private final TermIndexTracker termIndexTracker = new TermIndexTracker(); + private final RaftProperties properties = new RaftProperties(); + + private volatile boolean running = true; + private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX; + + SimpleStateMachine4Testing() { + checkpointer = new Daemon(() -> { + while (running) { + try { + if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >= + SNAPSHOT_THRESHOLD) { + endIndexLastCkpt = takeSnapshot(); + } + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } + } catch (IOException ioe) { + LOG.warn("Received IOException in Checkpointer", ioe); + } + } + }); + } + + @Override + public synchronized void initialize(String id, RaftProperties properties, + RaftStorage raftStorage) throws IOException { + LOG.info("Initializing " + getClass().getSimpleName() + ":" + id); + lifeCycle.startAndTransition(() -> { + super.initialize(id, properties, raftStorage); + storage.init(raftStorage); + loadSnapshot(storage.findLatestSnapshot()); + + if (properties.getBoolean( + RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_KEY, + RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT)) { + checkpointer.start(); + } + }); + } + + @Override + public synchronized void pause() { + lifeCycle.transition(LifeCycle.State.PAUSING); + lifeCycle.transition(LifeCycle.State.PAUSED); + } + + @Override + public synchronized void reinitialize(String id, RaftProperties properties, + RaftStorage storage) throws IOException { + LOG.info("Reinitializing " + getClass().getSimpleName() + ":" + id); + initialize(id, properties, storage); + } + + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + LogEntryProto entry = trx.getLogEntry().get(); + Preconditions.checkNotNull(entry); + list.add(entry); + termIndexTracker.update(ServerProtoUtils.toTermIndex(entry)); + return CompletableFuture.completedFuture( + new SimpleMessage(entry.getIndex() + " OK")); + } + + @Override + public long takeSnapshot() throws IOException { + TermIndex termIndex = termIndexTracker.getLatestTermIndex(); + if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) { + return RaftServerConstants.INVALID_LOG_INDEX; + } + final long endIndex = termIndex.getIndex(); + + // TODO: snapshot should be written to a tmp file, then renamed + File snapshotFile = storage.getSnapshotFile(termIndex.getTerm(), + termIndex.getIndex()); + LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", termIndex.getTerm(), + termIndex.getIndex(), snapshotFile); + try (LogOutputStream out = new LogOutputStream(snapshotFile, false, properties)) { + for (final LogEntryProto entry : list) { + if (entry.getIndex() > endIndex) { + break; + } else { + out.write(entry); + } + } + out.flush(); + } catch (IOException e) { + LOG.warn("Failed to take snapshot", e); + } + + try { + final MD5Hash digest = MD5FileUtil.computeMd5ForFile(snapshotFile); + MD5FileUtil.saveMD5File(snapshotFile, digest); + } catch (IOException e) { + LOG.warn("Hit IOException when computing MD5 for snapshot file " + + snapshotFile, e); + } + + try { + this.storage.loadLatestSnapshot(); + } catch (IOException e) { + LOG.warn("Hit IOException when loading latest snapshot for snapshot file " + + snapshotFile, e); + } + // TODO: purge log segments + return endIndex; + } + + @Override + public SimpleStateMachineStorage getStateMachineStorage() { + return storage; + } + + public synchronized long loadSnapshot(SingleFileSnapshotInfo snapshot) + throws IOException { + if (snapshot == null || !snapshot.getFile().getPath().toFile().exists()) { + LOG.info("The snapshot file {} does not exist", + snapshot == null ? null : snapshot.getFile()); + return RaftServerConstants.INVALID_LOG_INDEX; + } else { + LOG.info("Loading snapshot with t:{}, i:{}, file:{}", snapshot.getTerm(), + snapshot.getIndex(), snapshot.getFile().getPath()); + final long endIndex = snapshot.getIndex(); + try (LogInputStream in = new LogInputStream( + snapshot.getFile().getPath().toFile(), 0, endIndex, false)) { + LogEntryProto entry; + while ((entry = in.nextEntry()) != null) { + list.add(entry); + termIndexTracker.update(ServerProtoUtils.toTermIndex(entry)); + } + } + Preconditions.checkState( + !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(), + "endIndex=%s, list=%s", endIndex, list); + this.endIndexLastCkpt = endIndex; + termIndexTracker.init(snapshot.getTermIndex()); + this.storage.loadLatestSnapshot(); + return endIndex; + } + } + + @Override + public CompletableFuture<RaftClientReply> query( + RaftClientRequest request) { + return CompletableFuture.completedFuture( + new RaftClientReply(request, new SimpleMessage("query success"))); + } + + @Override + public TransactionContext startTransaction(RaftClientRequest request) + throws IOException { + return new TransactionContext(this, request, SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .build()); + } + + @Override + public void notifyNotLeader(Collection<TransactionContext> pendingEntries) { + // do nothing + } + + @Override + public void close() { + lifeCycle.checkStateAndClose(() -> { + running = false; + checkpointer.interrupt(); + }); + } + + public LogEntryProto[] getContent() { + return list.toArray(new LogEntryProto[list.size()]); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java new file mode 100644 index 0000000..31768e8 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; + +import org.apache.ratis.server.protocol.TermIndex; + +import com.google.common.base.Preconditions; + +/** + * Tracks the term index that is applied to the StateMachine for simple state machines with + * no concurrent snapshoting capabilities. + */ +class TermIndexTracker { + static final TermIndex INIT_TERMINDEX = + TermIndex.newTermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX); + + private TermIndex current = INIT_TERMINDEX; + + //TODO: developer note: everything is synchronized for now for convenience. + + /** + * Initialize the tracker with a term index (likely from a snapshot). + */ + public synchronized void init(TermIndex termIndex) { + this.current = termIndex; + } + + public synchronized void reset() { + init(INIT_TERMINDEX); + } + + /** + * Update the tracker with a new TermIndex. It means that the StateMachine has + * this index in memory. + */ + public synchronized void update(TermIndex termIndex) { + Preconditions.checkArgument(termIndex != null && + termIndex.compareTo(current) >= 0); + this.current = termIndex; + } + + /** + * Return latest term and index that is inserted to this tracker as an atomic + * entity. + */ + public synchronized TermIndex getLatestTermIndex() { + return current; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java new file mode 100644 index 0000000..cdce568 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.util.RaftUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +/** + * Test StateMachine related functionality + */ +public class TestStateMachine { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + public static final int NUM_SERVERS = 5; + + private final RaftProperties properties = new RaftProperties(); + { + // TODO: fix and run with in-memory log. It fails with NPE + properties.setBoolean(RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY, false); + } + + private MiniRaftClusterWithSimulatedRpc cluster; + + @Rule + public Timeout globalTimeout = new Timeout(60 * 1000); + + @Before + public void setup() throws IOException { + } + + private void startCluster() { + cluster = new MiniRaftClusterWithSimulatedRpc(NUM_SERVERS, properties); + Assert.assertNull(getCluster().getLeader()); + getCluster().start(); + } + + @After + public void tearDown() { + final MiniRaftCluster cluster = getCluster(); + if (cluster != null) { + cluster.shutdown(); + } + } + + public MiniRaftClusterWithSimulatedRpc getCluster() { + return cluster; + } + + public RaftProperties getProperties() { + return properties; + } + + static class SMTransactionContext extends SimpleStateMachine4Testing { + public static SMTransactionContext get(RaftServerImpl s) { + return (SMTransactionContext)RaftServerTestUtil.getStateMachine(s); + } + + AtomicReference<Throwable> throwable = new AtomicReference<>(null); + AtomicLong transactions = new AtomicLong(0); + AtomicBoolean isLeader = new AtomicBoolean(false); + AtomicLong numApplied = new AtomicLong(0); + ConcurrentLinkedQueue<Long> applied = new ConcurrentLinkedQueue<>(); + + @Override + public TransactionContext startTransaction(RaftClientRequest request) throws IOException { + // only leader will get this call + isLeader.set(true); + // send the next transaction id as the "context" from SM + return new TransactionContext(this, request, SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .build(), transactions.incrementAndGet()); + } + + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + try { + assertTrue(trx.getLogEntry().isPresent()); + assertTrue(trx.getSMLogEntry().isPresent()); + Optional<Object> context = trx.getStateMachineContext(); + if (isLeader.get()) { + assertTrue(trx.getClientRequest().isPresent()); + assertTrue(context.isPresent()); + assertTrue(context.get() instanceof Long); + Long val = (Long)context.get(); + assertTrue(val <= transactions.get()); + applied.add(val); + } else { + assertFalse(trx.getClientRequest().isPresent()); + assertFalse(context.isPresent()); + } + numApplied.incrementAndGet(); + } catch (Throwable t) { + throwable.set(t); + } + return CompletableFuture.completedFuture(null); + } + + void rethrowIfException() throws Throwable { + Throwable t = throwable.get(); + if (t != null) { + throw t; + } + } + } + + @Test + public void testTransactionContextIsPassedBack() throws Throwable { + // tests that the TrxContext set by the StateMachine in Leader is passed back to the SM + properties.setClass( + MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SMTransactionContext.class, StateMachine.class); + startCluster(); + + int numTrx = 100; + final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx); + try(final RaftClient client = cluster.createClient("client", null)) { + for (RaftTestUtil.SimpleMessage message : messages) { + client.send(message); + } + } + + // TODO: there eshould be a better way to ensure all data is replicated and applied + Thread.sleep(cluster.getMaxTimeout() + 100); + + for (RaftServerImpl raftServer : cluster.getServers()) { + final SMTransactionContext sm = SMTransactionContext.get(raftServer); + sm.rethrowIfException(); + assertEquals(numTrx, sm.numApplied.get()); + } + + // check leader + RaftServerImpl raftServer = cluster.getLeader(); + // assert every transaction has obtained context in leader + final SMTransactionContext sm = SMTransactionContext.get(raftServer); + List<Long> ll = sm.applied.stream().collect(Collectors.toList()); + Collections.sort(ll); + assertEquals(ll.toString(), ll.size(), numTrx); + for (int i=0; i < numTrx; i++) { + assertEquals(ll.toString(), Long.valueOf(i+1), ll.get(i)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/resources/log4j.properties b/ratis-server/src/test/resources/log4j.properties new file mode 100644 index 0000000..ced0687 --- /dev/null +++ b/ratis-server/src/test/resources/log4j.properties @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
