http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogCache.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogCache.java deleted file mode 100644 index dde6c7a..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogCache.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import org.apache.raft.RaftTestUtil.SimpleOperation; -import org.apache.raft.server.storage.RaftLogCache.TruncationSegments; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.ProtoUtils; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.Iterator; - -public class TestRaftLogCache { - private RaftLogCache cache; - - @Before - public void setup() { - cache = new RaftLogCache(); - } - - private LogSegment prepareLogSegment(long start, long end, boolean isOpen) { - LogSegment s = LogSegment.newOpenSegment(start); - for (long i = start; i <= end; i++) { - SimpleOperation m = new SimpleOperation("m" + i); - LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - 0, i); - s.appendToOpenSegment(entry); - } - if (!isOpen) { - s.close(); - } - return s; - } - - private void checkCache(long start, long end, int segmentSize) { - Assert.assertEquals(start, cache.getStartIndex()); - Assert.assertEquals(end, cache.getEndIndex()); - - for (long index = start; index <= end; index++) { - LogEntryProto entry = cache.getEntry(index); - Assert.assertEquals(index, entry.getIndex()); - } - - long[] offsets = new long[]{start, start + 1, start + (end - start) / 2, - end - 1, end}; - for (long offset : offsets) { - checkCacheEntries(offset, (int) (end - offset + 1), end); - checkCacheEntries(offset, 1, end); - checkCacheEntries(offset, 20, end); - checkCacheEntries(offset, segmentSize, end); - checkCacheEntries(offset, segmentSize - 1, end); - } - } - - private void checkCacheEntries(long offset, int size, long end) { - LogEntryProto[] entries = cache.getEntries(offset, offset + size); - long realEnd = offset + size > end + 1 ? end + 1 : offset + size; - Assert.assertEquals(realEnd - offset, entries.length); - for (long i = offset; i < realEnd; i++) { - Assert.assertEquals(i, entries[(int) (i - offset)].getIndex()); - } - } - - @Test - public void testAddSegments() throws Exception { - LogSegment s1 = prepareLogSegment(1, 100, false); - cache.addSegment(s1); - checkCache(1, 100, 100); - - try { - LogSegment s = prepareLogSegment(102, 103, true); - cache.addSegment(s); - Assert.fail("should fail since there is gap between two segments"); - } catch (IllegalStateException ignored) { - } - - LogSegment s2 = prepareLogSegment(101, 200, true); - cache.addSegment(s2); - checkCache(1, 200, 100); - - try { - LogSegment s = prepareLogSegment(201, 202, true); - cache.addSegment(s); - Assert.fail("should fail since there is still an open segment in cache"); - } catch (IllegalStateException ignored) { - } - - cache.rollOpenSegment(false); - checkCache(1, 200, 100); - - try { - LogSegment s = prepareLogSegment(202, 203, true); - cache.addSegment(s); - Assert.fail("should fail since there is gap between two segments"); - } catch (IllegalStateException ignored) { - } - - LogSegment s3 = prepareLogSegment(201, 300, true); - cache.addSegment(s3); - Assert.assertNotNull(cache.getOpenSegment()); - checkCache(1, 300, 100); - - cache.rollOpenSegment(true); - Assert.assertNotNull(cache.getOpenSegment()); - checkCache(1, 300, 100); - } - - @Test - public void testAppendEntry() throws Exception { - LogSegment closedSegment = prepareLogSegment(0, 99, false); - cache.addSegment(closedSegment); - - final SimpleOperation m = new SimpleOperation("m"); - try { - LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - 0, 0); - cache.appendEntry(entry); - Assert.fail("the open segment is null"); - } catch (IllegalStateException ignored) { - } - - LogSegment openSegment = prepareLogSegment(100, 100, true); - cache.addSegment(openSegment); - for (long index = 101; index < 200; index++) { - LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - 0, index); - cache.appendEntry(entry); - } - - Assert.assertNotNull(cache.getOpenSegment()); - checkCache(0, 199, 100); - } - - @Test - public void testTruncate() throws Exception { - long start = 0; - for (int i = 0; i < 5; i++) { // 5 closed segments - LogSegment s = prepareLogSegment(start, start + 99, false); - cache.addSegment(s); - start += 100; - } - // add another open segment - LogSegment s = prepareLogSegment(start, start + 99, true); - cache.addSegment(s); - - long end = cache.getEndIndex(); - Assert.assertEquals(599, end); - int numOfSegments = 6; - // start truncation - for (int i = 0; i < 10; i++) { // truncate 10 times - // each time truncate 37 entries - end -= 37; - TruncationSegments ts = cache.truncate(end + 1); - checkCache(0, end, 100); - - // check TruncationSegments - int currentNum= (int) (end / 100 + 1); - if (currentNum < numOfSegments) { - Assert.assertEquals(1, ts.toDelete.length); - numOfSegments = currentNum; - } else { - Assert.assertEquals(0, ts.toDelete.length); - } - } - - // 230 entries remaining. truncate at the segment boundary - TruncationSegments ts = cache.truncate(200); - checkCache(0, 199, 100); - Assert.assertEquals(1, ts.toDelete.length); - Assert.assertEquals(200, ts.toDelete[0].startIndex); - Assert.assertEquals(229, ts.toDelete[0].endIndex); - Assert.assertEquals(0, ts.toDelete[0].targetLength); - Assert.assertFalse(ts.toDelete[0].isOpen); - Assert.assertNull(ts.toTruncate); - - // add another open segment and truncate it as a whole - LogSegment newOpen = prepareLogSegment(200, 249, true); - cache.addSegment(newOpen); - ts = cache.truncate(200); - checkCache(0, 199, 100); - Assert.assertEquals(1, ts.toDelete.length); - Assert.assertEquals(200, ts.toDelete[0].startIndex); - Assert.assertEquals(249, ts.toDelete[0].endIndex); - Assert.assertEquals(0, ts.toDelete[0].targetLength); - Assert.assertTrue(ts.toDelete[0].isOpen); - Assert.assertNull(ts.toTruncate); - - // add another open segment and truncate part of it - newOpen = prepareLogSegment(200, 249, true); - cache.addSegment(newOpen); - ts = cache.truncate(220); - checkCache(0, 219, 100); - Assert.assertNull(cache.getOpenSegment()); - Assert.assertEquals(0, ts.toDelete.length); - Assert.assertTrue(ts.toTruncate.isOpen); - Assert.assertEquals(219, ts.toTruncate.newEndIndex); - Assert.assertEquals(200, ts.toTruncate.startIndex); - Assert.assertEquals(249, ts.toTruncate.endIndex); - } - - private void testIterator(long startIndex) { - Iterator<LogEntryProto> iterator = cache.iterator(startIndex); - LogEntryProto prev = null; - while (iterator.hasNext()) { - LogEntryProto entry = iterator.next(); - Assert.assertEquals(cache.getEntry(entry.getIndex()), entry); - if (prev != null) { - Assert.assertEquals(prev.getIndex() + 1, entry.getIndex()); - } - prev = entry; - } - if (startIndex <= cache.getEndIndex()) { - Assert.assertNotNull(prev); - Assert.assertEquals(cache.getEndIndex(), prev.getIndex()); - } - } - - @Test - public void testIterator() throws Exception { - long start = 0; - for (int i = 0; i < 2; i++) { // 2 closed segments - LogSegment s = prepareLogSegment(start, start + 99, false); - cache.addSegment(s); - start += 100; - } - // add another open segment - LogSegment s = prepareLogSegment(start, start + 99, true); - cache.addSegment(s); - - for (long startIndex = 0; startIndex < 300; startIndex += 50) { - testIterator(startIndex); - } - testIterator(299); - - Iterator<LogEntryProto> iterator = cache.iterator(300); - Assert.assertFalse(iterator.hasNext()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogReadWrite.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogReadWrite.java deleted file mode 100644 index fa17696..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogReadWrite.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import org.apache.raft.RaftTestUtil; -import org.apache.raft.RaftTestUtil.SimpleOperation; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.ChecksumException; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.server.impl.RaftServerConstants.StartupOption; -import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.FileUtils; -import org.apache.raft.util.ProtoUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.raft.server.RaftServerConfigKeys.*; - -/** - * Test basic functionality of LogReader, LogInputStream, and LogOutputStream. - */ -public class TestRaftLogReadWrite { - private static final Logger LOG = LoggerFactory.getLogger(TestRaftLogReadWrite.class); - - private File storageDir; - private RaftProperties properties; - private int segmentMaxSize; - - @Before - public void setup() throws Exception { - storageDir = RaftTestUtil.getTestDir(TestRaftLogReadWrite.class); - properties = new RaftProperties(); - properties.set(RAFT_SERVER_STORAGE_DIR_KEY, - FileUtils.fileAsURI(storageDir).toString()); - } - - @After - public void tearDown() throws Exception { - if (storageDir != null) { - FileUtils.fullyDelete(storageDir.getParentFile()); - } - } - - private LogEntryProto[] readLog(File file, long startIndex, long endIndex, - boolean isOpen) throws IOException { - List<LogEntryProto> list = new ArrayList<>(); - try (LogInputStream in = - new LogInputStream(file, startIndex, endIndex, isOpen)) { - LogEntryProto entry; - while ((entry = in.nextEntry()) != null) { - list.add(entry); - } - } - return list.toArray(new LogEntryProto[list.size()]); - } - - private long writeMessages(LogEntryProto[] entries, LogOutputStream out) - throws IOException { - long size = 0; - for (int i = 0; i < entries.length; i++) { - SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - final int s = entries[i].getSerializedSize(); - size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4; - out.write(entries[i]); - } - return size; - } - - /** - * Test basic functionality: write several log entries, then read - */ - @Test - public void testReadWriteLog() throws IOException { - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - long size = SegmentedRaftLog.HEADER_BYTES.length; - - final LogEntryProto[] entries = new LogEntryProto[100]; - try (LogOutputStream out = - new LogOutputStream(openSegment, false, properties)) { - size += writeMessages(entries, out); - } finally { - storage.close(); - } - - Assert.assertEquals(size, openSegment.length()); - - LogEntryProto[] readEntries = readLog(openSegment, 0, - RaftServerConstants.INVALID_LOG_INDEX, true); - Assert.assertArrayEquals(entries, readEntries); - } - - @Test - public void testAppendLog() throws IOException { - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - LogEntryProto[] entries = new LogEntryProto[200]; - try (LogOutputStream out = - new LogOutputStream(openSegment, false, properties)) { - for (int i = 0; i < 100; i++) { - SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - out.write(entries[i]); - } - } - - try (LogOutputStream out = - new LogOutputStream(openSegment, true, properties)) { - for (int i = 100; i < 200; i++) { - SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - out.write(entries[i]); - } - } - - LogEntryProto[] readEntries = readLog(openSegment, 0, - RaftServerConstants.INVALID_LOG_INDEX, true); - Assert.assertArrayEquals(entries, readEntries); - - storage.close(); - } - - /** - * Simulate the scenario that the peer is shutdown without truncating - * log segment file padding. Make sure the reader can correctly handle this. - */ - @Test - public void testReadWithPadding() throws IOException { - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - long size = SegmentedRaftLog.HEADER_BYTES.length; - - LogEntryProto[] entries = new LogEntryProto[100]; - LogOutputStream out = new LogOutputStream(openSegment, false, properties); - size += writeMessages(entries, out); - out.flush(); - - // make sure the file contains padding - Assert.assertEquals(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT, - openSegment.length()); - - // check if the reader can correctly read the log file - LogEntryProto[] readEntries = readLog(openSegment, 0, - RaftServerConstants.INVALID_LOG_INDEX, true); - Assert.assertArrayEquals(entries, readEntries); - - out.close(); - Assert.assertEquals(size, openSegment.length()); - } - - /** - * corrupt the padding by inserting non-zero bytes. Make sure the reader - * throws exception. - */ - @Test - public void testReadWithCorruptPadding() throws IOException { - properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 4 * 1024 * 1024); - properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 16 * 1024 * 1024); - - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - - LogEntryProto[] entries = new LogEntryProto[10]; - LogOutputStream out = new LogOutputStream(openSegment, false, properties); - for (int i = 0; i < 10; i++) { - SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - out.write(entries[i]); - } - out.flush(); - - // make sure the file contains padding - Assert.assertEquals(4 * 1024 * 1024, openSegment.length()); - - try (FileOutputStream fout = new FileOutputStream(openSegment, true)) { - ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{-1, 1}); - fout.getChannel() - .write(byteBuffer, 16 * 1024 * 1024 - 10); - } - - List<LogEntryProto> list = new ArrayList<>(); - try (LogInputStream in = new LogInputStream(openSegment, 0, - RaftServerConstants.INVALID_LOG_INDEX, true)) { - LogEntryProto entry; - while ((entry = in.nextEntry()) != null) { - list.add(entry); - } - Assert.fail("should fail since we corrupt the padding"); - } catch (IOException e) { - boolean findVerifyTerminator = false; - for (StackTraceElement s : e.getStackTrace()) { - if (s.getMethodName().equals("verifyTerminator")) { - findVerifyTerminator = true; - break; - } - } - Assert.assertTrue(findVerifyTerminator); - } - Assert.assertArrayEquals(entries, - list.toArray(new LogEntryProto[list.size()])); - } - - /** - * Test the log reader to make sure it can detect the checksum mismatch. - */ - @Test - public void testReadWithEntryCorruption() throws IOException { - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - try (LogOutputStream out = - new LogOutputStream(openSegment, false, properties)) { - for (int i = 0; i < 100; i++) { - LogEntryProto entry = ProtoUtils.toLogEntryProto( - new SimpleOperation("m" + i).getLogEntryContent(), 0, i); - out.write(entry); - } - } finally { - storage.close(); - } - - // corrupt the log file - try (RandomAccessFile raf = new RandomAccessFile(openSegment.getCanonicalFile(), - "rw")) { - raf.seek(100); - int correctValue = raf.read(); - raf.seek(100); - raf.write(correctValue + 1); - } - - try { - readLog(openSegment, 0, RaftServerConstants.INVALID_LOG_INDEX, true); - Assert.fail("The read of corrupted log file should fail"); - } catch (ChecksumException e) { - LOG.info("Caught ChecksumException as expected", e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogSegment.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogSegment.java deleted file mode 100644 index 470f80f..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftLogSegment.java +++ /dev/null @@ -1,303 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import org.apache.raft.RaftTestUtil; -import org.apache.raft.RaftTestUtil.SimpleOperation; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.impl.RaftServerConstants.StartupOption; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto; -import org.apache.raft.util.FileUtils; -import org.apache.raft.util.ProtoUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.apache.raft.server.RaftServerConfigKeys.*; -import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; -import static org.apache.raft.server.storage.LogSegment.getEntrySize; - -/** - * Test basic functionality of {@link LogSegment} - */ -public class TestRaftLogSegment { - private File storageDir; - private final RaftProperties properties = new RaftProperties(); - - @Before - public void setup() throws Exception { - storageDir = RaftTestUtil.getTestDir(TestRaftLogSegment.class); - properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, - storageDir.getCanonicalPath()); - } - - @After - public void tearDown() throws Exception { - if (storageDir != null) { - FileUtils.fullyDelete(storageDir.getParentFile()); - } - } - - private File prepareLog(boolean isOpen, long start, int size, long term) - throws IOException { - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - File file = isOpen ? storage.getStorageDir().getOpenLogFile(start) : - storage.getStorageDir().getClosedLogFile(start, start + size - 1); - - LogEntryProto[] entries = new LogEntryProto[size]; - try (LogOutputStream out = new LogOutputStream(file, false, properties)) { - for (int i = 0; i < size; i++) { - SimpleOperation op = new SimpleOperation("m" + i); - entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), - term, i + start); - out.write(entries[i]); - } - } - storage.close(); - return file; - } - - private void checkLogSegment(LogSegment segment, long start, long end, - boolean isOpen, long totalSize, long term) { - Assert.assertEquals(start, segment.getStartIndex()); - Assert.assertEquals(end, segment.getEndIndex()); - Assert.assertEquals(isOpen, segment.isOpen()); - Assert.assertEquals(totalSize, segment.getTotalSize()); - - long offset = SegmentedRaftLog.HEADER_BYTES.length; - for (long i = start; i <= end; i++) { - LogSegment.LogRecord record = segment.getLogRecord(i); - Assert.assertEquals(i, record.entry.getIndex()); - Assert.assertEquals(term, record.entry.getTerm()); - Assert.assertEquals(offset, record.offset); - - offset += getEntrySize(record.entry); - } - } - - @Test - public void testLoadLogSegment() throws Exception { - // load an open segment - File openSegmentFile = prepareLog(true, 0, 100, 0); - LogSegment openSegment = LogSegment.loadSegment(openSegmentFile, 0, - INVALID_LOG_INDEX, true, null); - checkLogSegment(openSegment, 0, 99, true, openSegmentFile.length(), 0); - - // load a closed segment (1000-1099) - File closedSegmentFile = prepareLog(false, 1000, 100, 1); - LogSegment closedSegment = LogSegment.loadSegment(closedSegmentFile, 1000, - 1099, false, null); - checkLogSegment(closedSegment, 1000, 1099, false, - closedSegment.getTotalSize(), 1); - } - - @Test - public void testAppendEntries() throws Exception { - final long start = 1000; - LogSegment segment = LogSegment.newOpenSegment(start); - long size = SegmentedRaftLog.HEADER_BYTES.length; - final long max = 8 * 1024 * 1024; - checkLogSegment(segment, start, start - 1, true, size, 0); - - // append till full - long term = 0; - int i = 0; - List<LogEntryProto> list = new ArrayList<>(); - while (size < max) { - SimpleOperation op = new SimpleOperation("m" + i); - LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), - term, i++ + start); - size += getEntrySize(entry); - list.add(entry); - } - - segment.appendToOpenSegment(list.toArray(new LogEntryProto[list.size()])); - Assert.assertTrue(segment.getTotalSize() >= max); - checkLogSegment(segment, start, i - 1 + start, true, size, term); - } - - @Test - public void testAppendWithGap() throws Exception { - LogSegment segment = LogSegment.newOpenSegment(1000); - SimpleOperation op = new SimpleOperation("m"); - final SMLogEntryProto m = op.getLogEntryContent(); - try { - LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001); - segment.appendToOpenSegment(entry); - Assert.fail("should fail since the entry's index needs to be 1000"); - } catch (Exception e) { - Assert.assertTrue(e instanceof IllegalArgumentException); - } - - LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000); - segment.appendToOpenSegment(entry); - - try { - entry = ProtoUtils.toLogEntryProto(m, 0, 1002); - segment.appendToOpenSegment(entry); - Assert.fail("should fail since the entry's index needs to be 1001"); - } catch (Exception e) { - Assert.assertTrue(e instanceof IllegalArgumentException); - } - - LogEntryProto[] entries = new LogEntryProto[2]; - for (int i = 0; i < 2; i++) { - entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2); - } - try { - segment.appendToOpenSegment(entries); - Assert.fail("should fail since there is gap between entries"); - } catch (Exception e) { - Assert.assertTrue(e instanceof IllegalArgumentException); - } - } - - @Test - public void testTruncate() throws Exception { - final long term = 1; - final long start = 1000; - LogSegment segment = LogSegment.newOpenSegment(start); - for (int i = 0; i < 100; i++) { - LogEntryProto entry = ProtoUtils.toLogEntryProto( - new SimpleOperation("m" + i).getLogEntryContent(), term, i + start); - segment.appendToOpenSegment(entry); - } - - // truncate an open segment (remove 1080~1099) - long newSize = segment.getLogRecord(start + 80).offset; - segment.truncate(start + 80); - Assert.assertEquals(80, segment.numOfEntries()); - checkLogSegment(segment, start, start + 79, false, newSize, term); - - // truncate a closed segment (remove 1050~1079) - newSize = segment.getLogRecord(start + 50).offset; - segment.truncate(start + 50); - Assert.assertEquals(50, segment.numOfEntries()); - checkLogSegment(segment, start, start + 49, false, newSize, term); - - // truncate all the remaining entries - segment.truncate(start); - Assert.assertEquals(0, segment.numOfEntries()); - checkLogSegment(segment, start, start - 1, false, - SegmentedRaftLog.HEADER_BYTES.length, term); - } - - private RaftProperties getProperties(long maxSegmentSize, - long preallocatedSize) { - RaftProperties p = new RaftProperties(); - p.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, - maxSegmentSize); - p.setLong(RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, - preallocatedSize); - return p; - } - - @Test - public void testPreallocateSegment() throws Exception { - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - final File file = storage.getStorageDir().getOpenLogFile(0); - final int[] maxSizes = new int[]{1024, 1025, 1024 * 1024 - 1, 1024 * 1024, - 1024 * 1024 + 1, 2 * 1024 * 1024 - 1, 2 * 1024 * 1024, - 2 * 1024 * 1024 + 1, 8 * 1024 * 1024}; - final int[] preallocated = new int[]{512, 1024, 1025, 1024 * 1024, - 1024 * 1024 + 1, 2 * 1024 * 1024}; - - // make sure preallocation is correct with different max/pre-allocated size - for (int max : maxSizes) { - for (int a : preallocated) { - try (LogOutputStream ignored = - new LogOutputStream(file, false, getProperties(max, a))) { - Assert.assertEquals(file.length(), Math.min(max, a)); - } - try (LogInputStream in = - new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) { - LogEntryProto entry = in.nextEntry(); - Assert.assertNull(entry); - } - } - } - - // test the scenario where an entry's size is larger than the max size - final byte[] content = new byte[1024 * 2]; - Arrays.fill(content, (byte) 1); - final long size; - try (LogOutputStream out = new LogOutputStream(file, false, - getProperties(1024, 1024))) { - SimpleOperation op = new SimpleOperation(new String(content)); - LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), - 0, 0); - size = LogSegment.getEntrySize(entry); - out.write(entry); - } - Assert.assertEquals(file.length(), - size + SegmentedRaftLog.HEADER_BYTES.length); - try (LogInputStream in = new LogInputStream(file, 0, - INVALID_LOG_INDEX, true)) { - LogEntryProto entry = in.nextEntry(); - Assert.assertArrayEquals(content, - entry.getSmLogEntry().getData().toByteArray()); - Assert.assertNull(in.nextEntry()); - } - } - - /** - * Keep appending and check if pre-allocation is correct - */ - @Test - public void testPreallocationAndAppend() throws Exception { - final long max = 2 * 1024 * 1024; - properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, max); - properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024); - properties.setLong(RAFT_LOG_WRITE_BUFFER_SIZE_KEY, 10 * 1024); - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - final File file = storage.getStorageDir().getOpenLogFile(0); - - final byte[] content = new byte[1024]; - Arrays.fill(content, (byte) 1); - SimpleOperation op = new SimpleOperation(new String(content)); - LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(), - 0, 0); - final long entrySize = LogSegment.getEntrySize(entry); - - long totalSize = SegmentedRaftLog.HEADER_BYTES.length; - long preallocated = 16 * 1024; - try (LogOutputStream out = new LogOutputStream(file, false, properties)) { - Assert.assertEquals(preallocated, file.length()); - while (totalSize + entrySize < max) { - totalSize += entrySize; - out.write(entry); - if (totalSize > preallocated) { - Assert.assertEquals("totalSize==" + totalSize, - preallocated + 16 * 1024, file.length()); - preallocated += 16 * 1024; - } - } - } - - Assert.assertEquals(totalSize, file.length()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftStorage.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftStorage.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftStorage.java deleted file mode 100644 index 1b14199..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/storage/TestRaftStorage.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import org.apache.raft.RaftTestUtil; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.io.nativeio.NativeIO; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.impl.RaftServerConstants.StartupOption; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.RaftStorageDirectory.StorageState; -import org.apache.raft.statemachine.SimpleStateMachineStorage; -import org.apache.raft.util.FileUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.internal.util.reflection.Whitebox; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.ThreadLocalRandom; - -/** - * Test RaftStorage and RaftStorageDirectory - */ -public class TestRaftStorage { - private File storageDir; - private final RaftProperties properties = new RaftProperties(); - - @Before - public void setup() throws Exception { - storageDir = RaftTestUtil.getTestDir(TestRaftStorage.class); - properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, - storageDir.getCanonicalPath()); - } - - @After - public void tearDown() throws Exception { - if (storageDir != null) { - FileUtils.fullyDelete(storageDir.getParentFile()); - } - } - - @Test - public void testNotExistent() throws IOException { - FileUtils.fullyDelete(storageDir); - - // we will format the empty directory - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - - try { - new RaftStorage(properties, StartupOption.FORMAT).close(); - Assert.fail("the format should fail since the storage is still locked"); - } catch (IOException e) { - Assert.assertTrue(e.getMessage().contains("directory is already locked")); - } - - storage.close(); - FileUtils.fullyDelete(storageDir); - Assert.assertTrue(storageDir.createNewFile()); - try { - new RaftStorage(properties, StartupOption.REGULAR); - Assert.fail(); - } catch (IOException e) { - Assert.assertTrue( - e.getMessage().contains(StorageState.NON_EXISTENT.name())); - } - } - - /** - * make sure the RaftStorage format works - */ - @Test - public void testStorage() throws Exception { - RaftStorageDirectory sd = new RaftStorageDirectory(storageDir); - try { - StorageState state = sd.analyzeStorage(true); - Assert.assertEquals(StorageState.NOT_FORMATTED, state); - Assert.assertTrue(sd.isCurrentEmpty()); - } finally { - sd.unlock(); - } - - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - storage.close(); - - Assert.assertEquals(StorageState.NORMAL, sd.analyzeStorage(false)); - File m = sd.getMetaFile(); - Assert.assertTrue(m.exists()); - MetaFile metaFile = new MetaFile(m); - Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); - Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); - - metaFile.set(123, "peer1"); - metaFile.readFile(); - Assert.assertEquals(123, metaFile.getTerm()); - Assert.assertEquals("peer1", metaFile.getVotedFor()); - - MetaFile metaFile2 = new MetaFile(m); - Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded")); - Assert.assertEquals(123, metaFile.getTerm()); - Assert.assertEquals("peer1", metaFile.getVotedFor()); - - // test format - storage = new RaftStorage(properties, StartupOption.FORMAT); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - metaFile = new MetaFile(sd.getMetaFile()); - Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); - Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); - storage.close(); - } - - @Test - public void testMetaFile() throws Exception { - RaftStorage storage = new RaftStorage(properties, StartupOption.FORMAT); - File m = storage.getStorageDir().getMetaFile(); - Assert.assertTrue(m.exists()); - MetaFile metaFile = new MetaFile(m); - Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); - Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); - - metaFile.set(123, "peer1"); - metaFile.readFile(); - Assert.assertEquals(123, metaFile.getTerm()); - Assert.assertEquals("peer1", metaFile.getVotedFor()); - - MetaFile metaFile2 = new MetaFile(m); - Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded")); - Assert.assertEquals(123, metaFile.getTerm()); - Assert.assertEquals("peer1", metaFile.getVotedFor()); - - storage.close(); - } - - /** - * check if RaftStorage deletes tmp metafile when startup - */ - @Test - public void testCleanMetaTmpFile() throws Exception { - RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - storage.close(); - - RaftStorageDirectory sd = new RaftStorageDirectory(storageDir); - File metaFile = sd.getMetaFile(); - NativeIO.renameTo(metaFile, sd.getMetaTmpFile()); - - Assert.assertEquals(StorageState.NOT_FORMATTED, sd.analyzeStorage(false)); - - try { - new RaftStorage(properties, StartupOption.REGULAR); - Assert.fail("should throw IOException since storage dir is not formatted"); - } catch (IOException e) { - Assert.assertTrue( - e.getMessage().contains(StorageState.NOT_FORMATTED.name())); - } - - // let the storage dir contain both raft-meta and raft-meta.tmp - new RaftStorage(properties, StartupOption.FORMAT).close(); - Assert.assertTrue(sd.getMetaFile().exists()); - Assert.assertTrue(sd.getMetaTmpFile().createNewFile()); - Assert.assertTrue(sd.getMetaTmpFile().exists()); - try { - storage = new RaftStorage(properties, StartupOption.REGULAR); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - Assert.assertFalse(sd.getMetaTmpFile().exists()); - Assert.assertTrue(sd.getMetaFile().exists()); - } finally { - storage.close(); - } - } - - @Test - public void testSnapshotFileName() throws Exception { - final long term = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); - final long index = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); - final String name = SimpleStateMachineStorage.getSnapshotFileName(term, index); - System.out.println("name = " + name); - final File file = new File(storageDir, name); - final TermIndex ti = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(file); - System.out.println("file = " + file); - Assert.assertEquals(term, ti.getTerm()); - Assert.assertEquals(index, ti.getIndex()); - System.out.println("ti = " + ti); - - final File foo = new File(storageDir, "foo"); - try { - SimpleStateMachineStorage.getTermIndexFromSnapshotFile(foo); - Assert.fail(); - } catch(IllegalArgumentException iae) { - System.out.println("Good " + iae); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/storage/TestSegmentedRaftLog.java b/raft-server/src/test/java/org/apache/raft/server/storage/TestSegmentedRaftLog.java deleted file mode 100644 index 264ba8e..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/storage/TestSegmentedRaftLog.java +++ /dev/null @@ -1,329 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import org.apache.log4j.Level; -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.RaftTestUtil.SimpleOperation; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.impl.ConfigurationManager; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.FileUtils; -import org.apache.raft.util.ProtoUtils; -import org.apache.raft.util.RaftUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.function.Supplier; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY; - -public class TestSegmentedRaftLog { - static { - RaftUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG); - } - - private static final String peerId = "s0"; - - private static class SegmentRange { - final long start; - final long end; - final long term; - final boolean isOpen; - - SegmentRange(long s, long e, long term, boolean isOpen) { - this.start = s; - this.end = e; - this.term = term; - this.isOpen = isOpen; - } - } - - private File storageDir; - private RaftProperties properties; - private RaftStorage storage; - private final ConfigurationManager cm = new ConfigurationManager( - MiniRaftCluster.initConfiguration(MiniRaftCluster.generateIds(3, 0))); - - @Before - public void setup() throws Exception { - storageDir = RaftTestUtil.getTestDir(TestSegmentedRaftLog.class); - properties = new RaftProperties(); - properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, - storageDir.getCanonicalPath()); - storage = new RaftStorage(properties, RaftServerConstants.StartupOption.REGULAR); - } - - @After - public void tearDown() throws Exception { - if (storageDir != null) { - FileUtils.fullyDelete(storageDir.getParentFile()); - } - } - - private LogEntryProto[] prepareLog(List<SegmentRange> list) throws IOException { - List<LogEntryProto> entryList = new ArrayList<>(); - for (SegmentRange range : list) { - File file = range.isOpen ? - storage.getStorageDir().getOpenLogFile(range.start) : - storage.getStorageDir().getClosedLogFile(range.start, range.end); - - final int size = (int) (range.end - range.start + 1); - LogEntryProto[] entries = new LogEntryProto[size]; - try (LogOutputStream out = new LogOutputStream(file, false, properties)) { - for (int i = 0; i < size; i++) { - SimpleOperation m = new SimpleOperation("m" + (i + range.start)); - entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - range.term, i + range.start); - out.write(entries[i]); - } - } - Collections.addAll(entryList, entries); - } - return entryList.toArray(new LogEntryProto[entryList.size()]); - } - - private List<SegmentRange> prepareRanges(int number, int segmentSize, - long startIndex) { - List<SegmentRange> list = new ArrayList<>(number); - for (int i = 0; i < number; i++) { - list.add(new SegmentRange(startIndex, startIndex + segmentSize - 1, i, - i == number - 1)); - startIndex += segmentSize; - } - return list; - } - - @Test - public void testLoadLogSegments() throws Exception { - // first generate log files - List<SegmentRange> ranges = prepareRanges(5, 100, 0); - LogEntryProto[] entries = prepareLog(ranges); - - // create RaftLog object and load log file - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - // check if log entries are loaded correctly - for (LogEntryProto e : entries) { - LogEntryProto entry = raftLog.get(e.getIndex()); - Assert.assertEquals(e, entry); - } - - Assert.assertArrayEquals(entries, raftLog.getEntries(0, 500)); - Assert.assertEquals(entries[entries.length - 1], raftLog.getLastEntry()); - } - } - - List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist, - Supplier<String> stringSupplier) { - List<LogEntryProto> eList = new ArrayList<>(); - for (SegmentRange range : slist) { - for (long index = range.start; index <= range.end; index++) { - SimpleOperation m = stringSupplier == null ? - new SimpleOperation("m" + index) : - new SimpleOperation(stringSupplier.get()); - eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(), - range.term, index)); - } - } - return eList; - } - - /** - * Append entry one by one and check if log state is correct. - */ - @Test - public void testAppendEntry() throws Exception { - List<SegmentRange> ranges = prepareRanges(5, 200, 0); - List<LogEntryProto> entries = prepareLogEntries(ranges, null); - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - // append entries to the raftlog - entries.forEach(raftLog::appendEntry); - raftLog.logSync(); - } - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - // check if the raft log is correct - checkEntries(raftLog, entries, 0, entries.size()); - } - } - - /** - * Keep appending entries, make sure the rolling is correct. - */ - @Test - public void testAppendAndRoll() throws Exception { - properties.setLong(RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY, 16 * 1024); - properties.setLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 128 * 1024); - - List<SegmentRange> ranges = prepareRanges(1, 1024, 0); - final byte[] content = new byte[1024]; - List<LogEntryProto> entries = prepareLogEntries(ranges, - () -> new String(content)); - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - // append entries to the raftlog - entries.forEach(raftLog::appendEntry); - raftLog.logSync(); - } - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - // check if the raft log is correct - checkEntries(raftLog, entries, 0, entries.size()); - Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments()); - } - } - - @Test - public void testTruncate() throws Exception { - // prepare the log for truncation - List<SegmentRange> ranges = prepareRanges(5, 200, 0); - List<LogEntryProto> entries = prepareLogEntries(ranges, null); - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - // append entries to the raftlog - entries.forEach(raftLog::appendEntry); - raftLog.logSync(); - } - - for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) { - testTruncate(entries, fromIndex); - } - } - - private void testTruncate(List<LogEntryProto> entries, long fromIndex) - throws Exception { - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - // truncate the log - raftLog.truncate(fromIndex); - raftLog.logSync(); - - checkEntries(raftLog, entries, 0, (int) fromIndex); - } - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - // check if the raft log is correct - if (fromIndex > 0) { - Assert.assertEquals(entries.get((int) (fromIndex - 1)), - raftLog.getLastEntry()); - } else { - Assert.assertNull(raftLog.getLastEntry()); - } - checkEntries(raftLog, entries, 0, (int) fromIndex); - } - } - - private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected, - int offset, int size) { - if (size > 0) { - for (int i = offset; i < size + offset; i++) { - LogEntryProto entry = raftLog.get(expected.get(i).getIndex()); - Assert.assertEquals(expected.get(i), entry); - } - LogEntryProto[] entriesFromLog = raftLog.getEntries( - expected.get(offset).getIndex(), - expected.get(offset + size - 1).getIndex() + 1); - LogEntryProto[] expectedArray = expected.subList(offset, offset + size) - .toArray(SegmentedRaftLog.EMPTY_LOGENTRY_ARRAY); - Assert.assertArrayEquals(expectedArray, entriesFromLog); - } - } - - /** - * Test append with inconsistent entries - */ - @Test - public void testAppendEntriesWithInconsistency() throws Exception { - // prepare the log for truncation - List<SegmentRange> ranges = prepareRanges(5, 200, 0); - List<LogEntryProto> entries = prepareLogEntries(ranges, null); - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - // append entries to the raftlog - entries.forEach(raftLog::appendEntry); - raftLog.logSync(); - } - - // append entries whose first 100 entries are the same with existing log, - // and the next 100 are with different term - SegmentRange r1 = new SegmentRange(550, 599, 2, false); - SegmentRange r2 = new SegmentRange(600, 649, 3, false); - SegmentRange r3 = new SegmentRange(650, 749, 10, false); - List<LogEntryProto> newEntries = prepareLogEntries( - Arrays.asList(r1, r2, r3), null); - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])); - raftLog.logSync(); - - checkEntries(raftLog, entries, 0, 650); - checkEntries(raftLog, newEntries, 100, 100); - Assert.assertEquals(newEntries.get(newEntries.size() - 1), - raftLog.getLastEntry()); - Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), - raftLog.getLatestFlushedIndex()); - } - - // load the raftlog again and check - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX); - checkEntries(raftLog, entries, 0, 650); - checkEntries(raftLog, newEntries, 100, 100); - Assert.assertEquals(newEntries.get(newEntries.size() - 1), - raftLog.getLastEntry()); - Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), - raftLog.getLatestFlushedIndex()); - - RaftLogCache cache = raftLog.getRaftLogCache(); - Assert.assertEquals(5, cache.getNumOfSegments()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java b/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java deleted file mode 100644 index fbdcb8b..0000000 --- a/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import org.apache.log4j.Level; -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.RaftTestUtil.SimpleMessage; -import org.apache.raft.client.RaftClient; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.SetConfigurationRequest; -import org.apache.raft.server.RaftServer; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerTestUtil; -import org.apache.raft.server.simulation.RequestHandler; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.server.storage.RaftStorageDirectory; -import org.apache.raft.server.storage.RaftStorageDirectory.LogPathAndIndex; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.FileUtils; -import org.apache.raft.util.RaftUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY; -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY; -import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM; - -public abstract class RaftSnapshotBaseTest { - static { - RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - static final Logger LOG = LoggerFactory.getLogger(RaftSnapshotBaseTest.class); - private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10; - - static File getSnapshotFile(MiniRaftCluster cluster, int i) { - final RaftServerImpl leader = cluster.getLeader(); - final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader); - return sm.getStateMachineStorage().getSnapshotFile( - leader.getState().getCurrentTerm(), i); - } - - static void assertLeaderContent(MiniRaftCluster cluster) - throws InterruptedException { - final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster); - Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2, - leader.getState().getLog().getLastCommittedIndex()); - final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent(); - - for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { - Assert.assertEquals(i+1, entries[i].getIndex()); - Assert.assertArrayEquals( - new SimpleMessage("m" + i).getContent().toByteArray(), - entries[i].getSmLogEntry().getData().toByteArray()); - } - } - - private MiniRaftCluster cluster; - - public abstract MiniRaftCluster initCluster(int numServer, RaftProperties prop) - throws IOException; - - @Before - public void setup() throws IOException { - final RaftProperties prop = new RaftProperties(); - prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, - SimpleStateMachine4Testing.class, StateMachine.class); - prop.setLong(RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY, - SNAPSHOT_TRIGGER_THRESHOLD); - prop.setBoolean(RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, true); - this.cluster = initCluster(1, prop); - cluster.start(); - } - - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - /** - * Keep generating writing traffic and make sure snapshots are taken. - * We then restart the whole raft peer and check if it can correctly load - * snapshots + raft log. - */ - @Test - public void testRestartPeer() throws Exception { - RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - int i = 0; - try(final RaftClient client = cluster.createClient("client", leaderId)) { - for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { - RaftClientReply reply = client.send(new SimpleMessage("m" + i)); - Assert.assertTrue(reply.isSuccess()); - } - } - - // wait for the snapshot to be done - final File snapshotFile = getSnapshotFile(cluster, i); - - int retries = 0; - do { - Thread.sleep(1000); - } while (!snapshotFile.exists() && retries++ < 10); - - Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists()); - - // restart the peer and check if it can correctly load snapshot - cluster.restart(false); - try { - // 200 messages + two leader elections --> last committed = 201 - assertLeaderContent(cluster); - } finally { - cluster.shutdown(); - } - } - - /** - * Basic test for install snapshot: start a one node cluster and let it - * generate a snapshot. Then delete the log and restart the node, and add more - * nodes as followers. - */ - @Test - public void testBasicInstallSnapshot() throws Exception { - List<LogPathAndIndex> logs = new ArrayList<>(); - try { - RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - - int i = 0; - try(final RaftClient client = cluster.createClient("client", leaderId)) { - for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { - RaftClientReply reply = client.send(new SimpleMessage("m" + i)); - Assert.assertTrue(reply.isSuccess()); - } - } - - // wait for the snapshot to be done - RaftStorageDirectory storageDirectory = cluster.getLeader().getState() - .getStorage().getStorageDir(); - final File snapshotFile = getSnapshotFile(cluster, i); - logs = storageDirectory.getLogSegmentFiles(); - - int retries = 0; - do { - Thread.sleep(1000); - } while (!snapshotFile.exists() && retries++ < 10); - - Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists()); - } finally { - cluster.shutdown(); - } - - // delete the log segments from the leader - for (LogPathAndIndex path : logs) { - FileUtils.deleteFile(path.path.toFile()); - } - - // restart the peer - LOG.info("Restarting the cluster"); - cluster.restart(false); - try { - assertLeaderContent(cluster); - - // generate some more traffic - try(final RaftClient client = cluster.createClient("client", - cluster.getLeader().getId())) { - Assert.assertTrue(client.send(new SimpleMessage("test")).isSuccess()); - } - - // add two more peers - MiniRaftCluster.PeerChanges change = cluster.addNewPeers( - new String[]{"s3", "s4"}, true); - // trigger setConfiguration - SetConfigurationRequest request = new SetConfigurationRequest("client", - cluster.getLeader().getId(), DEFAULT_SEQNUM, change.allPeersInNewConf); - LOG.info("Start changing the configuration: {}", request); - cluster.getLeader().setConfiguration(request); - - RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); - } finally { - cluster.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java b/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java deleted file mode 100644 index b8dd3f3..0000000 --- a/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java +++ /dev/null @@ -1,245 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import com.google.common.base.Preconditions; -import org.apache.raft.RaftTestUtil.SimpleMessage; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.io.MD5Hash; -import org.apache.raft.protocol.Message; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerTestUtil; -import org.apache.raft.server.impl.ServerProtoUtils; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.LogInputStream; -import org.apache.raft.server.storage.LogOutputStream; -import org.apache.raft.server.storage.RaftStorage; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.LifeCycle; -import org.apache.raft.util.MD5FileUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -/** - * A {@link StateMachine} implementation example that simply stores all the log - * entries in a list. Mainly used for test. - * - * For snapshot it simply merges all the log segments together. - */ -public class SimpleStateMachine4Testing extends BaseStateMachine { - static volatile int SNAPSHOT_THRESHOLD = 100; - static final Logger LOG = LoggerFactory.getLogger(SimpleStateMachine4Testing.class); - public static final String RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_KEY - = "raft.test.simple.state.machine.take.snapshot"; - public static final boolean RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT = false; - - public static SimpleStateMachine4Testing get(RaftServerImpl s) { - return (SimpleStateMachine4Testing)RaftServerTestUtil.getStateMachine(s); - } - - private final List<LogEntryProto> list = - Collections.synchronizedList(new ArrayList<>()); - private final Daemon checkpointer; - private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); - private final TermIndexTracker termIndexTracker = new TermIndexTracker(); - private final RaftProperties properties = new RaftProperties(); - - private volatile boolean running = true; - private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX; - - SimpleStateMachine4Testing() { - checkpointer = new Daemon(() -> { - while (running) { - try { - if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >= - SNAPSHOT_THRESHOLD) { - endIndexLastCkpt = takeSnapshot(); - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignored) { - } - } catch (IOException ioe) { - LOG.warn("Received IOException in Checkpointer", ioe); - } - } - }); - } - - @Override - public synchronized void initialize(String id, RaftProperties properties, - RaftStorage raftStorage) throws IOException { - LOG.info("Initializing " + getClass().getSimpleName() + ":" + id); - lifeCycle.startAndTransition(() -> { - super.initialize(id, properties, raftStorage); - storage.init(raftStorage); - loadSnapshot(storage.findLatestSnapshot()); - - if (properties.getBoolean( - RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_KEY, - RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT)) { - checkpointer.start(); - } - }); - } - - @Override - public synchronized void pause() { - lifeCycle.transition(LifeCycle.State.PAUSING); - lifeCycle.transition(LifeCycle.State.PAUSED); - } - - @Override - public synchronized void reinitialize(String id, RaftProperties properties, - RaftStorage storage) throws IOException { - LOG.info("Reinitializing " + getClass().getSimpleName() + ":" + id); - initialize(id, properties, storage); - } - - @Override - public CompletableFuture<Message> applyTransaction(TransactionContext trx) { - LogEntryProto entry = trx.getLogEntry().get(); - Preconditions.checkNotNull(entry); - list.add(entry); - termIndexTracker.update(ServerProtoUtils.toTermIndex(entry)); - return CompletableFuture.completedFuture( - new SimpleMessage(entry.getIndex() + " OK")); - } - - @Override - public long takeSnapshot() throws IOException { - TermIndex termIndex = termIndexTracker.getLatestTermIndex(); - if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) { - return RaftServerConstants.INVALID_LOG_INDEX; - } - final long endIndex = termIndex.getIndex(); - - // TODO: snapshot should be written to a tmp file, then renamed - File snapshotFile = storage.getSnapshotFile(termIndex.getTerm(), - termIndex.getIndex()); - LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}", termIndex.getTerm(), - termIndex.getIndex(), snapshotFile); - try (LogOutputStream out = new LogOutputStream(snapshotFile, false, properties)) { - for (final LogEntryProto entry : list) { - if (entry.getIndex() > endIndex) { - break; - } else { - out.write(entry); - } - } - out.flush(); - } catch (IOException e) { - LOG.warn("Failed to take snapshot", e); - } - - try { - final MD5Hash digest = MD5FileUtil.computeMd5ForFile(snapshotFile); - MD5FileUtil.saveMD5File(snapshotFile, digest); - } catch (IOException e) { - LOG.warn("Hit IOException when computing MD5 for snapshot file " - + snapshotFile, e); - } - - try { - this.storage.loadLatestSnapshot(); - } catch (IOException e) { - LOG.warn("Hit IOException when loading latest snapshot for snapshot file " - + snapshotFile, e); - } - // TODO: purge log segments - return endIndex; - } - - @Override - public SimpleStateMachineStorage getStateMachineStorage() { - return storage; - } - - public synchronized long loadSnapshot(SingleFileSnapshotInfo snapshot) - throws IOException { - if (snapshot == null || !snapshot.getFile().getPath().toFile().exists()) { - LOG.info("The snapshot file {} does not exist", - snapshot == null ? null : snapshot.getFile()); - return RaftServerConstants.INVALID_LOG_INDEX; - } else { - LOG.info("Loading snapshot with t:{}, i:{}, file:{}", snapshot.getTerm(), - snapshot.getIndex(), snapshot.getFile().getPath()); - final long endIndex = snapshot.getIndex(); - try (LogInputStream in = new LogInputStream( - snapshot.getFile().getPath().toFile(), 0, endIndex, false)) { - LogEntryProto entry; - while ((entry = in.nextEntry()) != null) { - list.add(entry); - termIndexTracker.update(ServerProtoUtils.toTermIndex(entry)); - } - } - Preconditions.checkState( - !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(), - "endIndex=%s, list=%s", endIndex, list); - this.endIndexLastCkpt = endIndex; - termIndexTracker.init(snapshot.getTermIndex()); - this.storage.loadLatestSnapshot(); - return endIndex; - } - } - - @Override - public CompletableFuture<RaftClientReply> query( - RaftClientRequest request) { - return CompletableFuture.completedFuture( - new RaftClientReply(request, new SimpleMessage("query success"))); - } - - @Override - public TransactionContext startTransaction(RaftClientRequest request) - throws IOException { - return new TransactionContext(this, request, SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .build()); - } - - @Override - public void notifyNotLeader(Collection<TransactionContext> pendingEntries) { - // do nothing - } - - @Override - public void close() { - lifeCycle.checkStateAndClose(() -> { - running = false; - checkpointer.interrupt(); - }); - } - - public LogEntryProto[] getContent() { - return list.toArray(new LogEntryProto[list.size()]); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java b/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java deleted file mode 100644 index b08fe11..0000000 --- a/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import com.google.common.base.Preconditions; -import org.apache.raft.server.protocol.TermIndex; - -import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; - -/** - * Tracks the term index that is applied to the StateMachine for simple state machines with - * no concurrent snapshoting capabilities. - */ -class TermIndexTracker { - static final TermIndex INIT_TERMINDEX = - TermIndex.newTermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX); - - private TermIndex current = INIT_TERMINDEX; - - //TODO: developer note: everything is synchronized for now for convenience. - - /** - * Initialize the tracker with a term index (likely from a snapshot). - */ - public synchronized void init(TermIndex termIndex) { - this.current = termIndex; - } - - public synchronized void reset() { - init(INIT_TERMINDEX); - } - - /** - * Update the tracker with a new TermIndex. It means that the StateMachine has - * this index in memory. - */ - public synchronized void update(TermIndex termIndex) { - Preconditions.checkArgument(termIndex != null && - termIndex.compareTo(current) >= 0); - this.current = termIndex; - } - - /** - * Return latest term and index that is inserted to this tracker as an atomic - * entity. - */ - public synchronized TermIndex getLatestTermIndex() { - return current; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java b/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java deleted file mode 100644 index 546bfb8..0000000 --- a/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import org.apache.log4j.Level; -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.client.RaftClient; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.Message; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerTestUtil; -import org.apache.raft.server.simulation.MiniRaftClusterWithSimulatedRpc; -import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto; -import org.apache.raft.util.RaftUtils; -import org.junit.*; -import org.junit.rules.Timeout; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import static org.junit.Assert.*; - -/** - * Test StateMachine related functionality - */ -public class TestStateMachine { - static { - RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - public static final int NUM_SERVERS = 5; - - private final RaftProperties properties = new RaftProperties(); - { - // TODO: fix and run with in-memory log. It fails with NPE - properties.setBoolean(RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY, false); - } - - private MiniRaftClusterWithSimulatedRpc cluster; - - @Rule - public Timeout globalTimeout = new Timeout(60 * 1000); - - @Before - public void setup() throws IOException { - } - - private void startCluster() { - cluster = new MiniRaftClusterWithSimulatedRpc(NUM_SERVERS, properties); - Assert.assertNull(getCluster().getLeader()); - getCluster().start(); - } - - @After - public void tearDown() { - final MiniRaftCluster cluster = getCluster(); - if (cluster != null) { - cluster.shutdown(); - } - } - - public MiniRaftClusterWithSimulatedRpc getCluster() { - return cluster; - } - - public RaftProperties getProperties() { - return properties; - } - - static class SMTransactionContext extends SimpleStateMachine4Testing { - public static SMTransactionContext get(RaftServerImpl s) { - return (SMTransactionContext)RaftServerTestUtil.getStateMachine(s); - } - - AtomicReference<Throwable> throwable = new AtomicReference<>(null); - AtomicLong transactions = new AtomicLong(0); - AtomicBoolean isLeader = new AtomicBoolean(false); - AtomicLong numApplied = new AtomicLong(0); - ConcurrentLinkedQueue<Long> applied = new ConcurrentLinkedQueue<>(); - - @Override - public TransactionContext startTransaction(RaftClientRequest request) throws IOException { - // only leader will get this call - isLeader.set(true); - // send the next transaction id as the "context" from SM - return new TransactionContext(this, request, SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .build(), transactions.incrementAndGet()); - } - - @Override - public CompletableFuture<Message> applyTransaction(TransactionContext trx) { - try { - assertTrue(trx.getLogEntry().isPresent()); - assertTrue(trx.getSMLogEntry().isPresent()); - Optional<Object> context = trx.getStateMachineContext(); - if (isLeader.get()) { - assertTrue(trx.getClientRequest().isPresent()); - assertTrue(context.isPresent()); - assertTrue(context.get() instanceof Long); - Long val = (Long)context.get(); - assertTrue(val <= transactions.get()); - applied.add(val); - } else { - assertFalse(trx.getClientRequest().isPresent()); - assertFalse(context.isPresent()); - } - numApplied.incrementAndGet(); - } catch (Throwable t) { - throwable.set(t); - } - return CompletableFuture.completedFuture(null); - } - - void rethrowIfException() throws Throwable { - Throwable t = throwable.get(); - if (t != null) { - throw t; - } - } - } - - @Test - public void testTransactionContextIsPassedBack() throws Throwable { - // tests that the TrxContext set by the StateMachine in Leader is passed back to the SM - properties.setClass( - MiniRaftCluster.STATEMACHINE_CLASS_KEY, - SMTransactionContext.class, StateMachine.class); - startCluster(); - - int numTrx = 100; - final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx); - try(final RaftClient client = cluster.createClient("client", null)) { - for (RaftTestUtil.SimpleMessage message : messages) { - client.send(message); - } - } - - // TODO: there eshould be a better way to ensure all data is replicated and applied - Thread.sleep(cluster.getMaxTimeout() + 100); - - for (RaftServerImpl raftServer : cluster.getServers()) { - final SMTransactionContext sm = SMTransactionContext.get(raftServer); - sm.rethrowIfException(); - assertEquals(numTrx, sm.numApplied.get()); - } - - // check leader - RaftServerImpl raftServer = cluster.getLeader(); - // assert every transaction has obtained context in leader - final SMTransactionContext sm = SMTransactionContext.get(raftServer); - List<Long> ll = sm.applied.stream().collect(Collectors.toList()); - Collections.sort(ll); - assertEquals(ll.toString(), ll.size(), numTrx); - for (int i=0; i < numTrx; i++) { - assertEquals(ll.toString(), Long.valueOf(i+1), ll.get(i)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/raft-server/src/test/resources/log4j.properties b/raft-server/src/test/resources/log4j.properties deleted file mode 100644 index ced0687..0000000 --- a/raft-server/src/test/resources/log4j.properties +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# log4j configuration used during build and unit tests - -log4j.rootLogger=info,stdout -log4j.threshold=ALL -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-client/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-client/pom.xml b/ratis-client/pom.xml new file mode 100644 index 0000000..58d9817 --- /dev/null +++ b/ratis-client/pom.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>ratis-project-dist</artifactId> + <groupId>org.apache.ratis</groupId> + <version>1.0-SNAPSHOT</version> + <relativePath>../ratis-project-dist</relativePath> + </parent> + + <artifactId>ratis-client</artifactId> + <name>Ratis Client</name> + + <dependencies> + <dependency> + <artifactId>ratis-proto-shaded</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java new file mode 100644 index 0000000..e4e0b84 --- /dev/null +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.client; + +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; + +/** A client who sends requests to a raft service. */ +public interface RaftClient extends Closeable { + Logger LOG = LoggerFactory.getLogger(RaftClient.class); + long DEFAULT_SEQNUM = 0; + + /** @return the id of this client. */ + String getId(); + + /** + * Send the given message to the raft service. + * The message may change the state of the service. + * For readonly messages, use {@link #sendReadOnly(Message)} instead. + */ + RaftClientReply send(Message message) throws IOException; + + /** Send the given readonly message to the raft service. */ + RaftClientReply sendReadOnly(Message message) throws IOException; + + /** Send set configuration request to the raft service. */ + RaftClientReply setConfiguration(RaftPeer[] peersInNewConf) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java new file mode 100644 index 0000000..e1e1593 --- /dev/null +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.client; + +public interface RaftClientConfigKeys { + String RAFT_RPC_TIMEOUT_MS_KEY = "raft.rpc.timeout.ms"; + int RAFT_RPC_TIMEOUT_MS_DEFAULT = 300; +}
