http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java deleted file mode 100644 index 7d9fdf5..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.storage; - -import org.apache.ratis.BaseTest; -import org.apache.ratis.RaftTestUtil.SimpleOperation; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.ChecksumException; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.RaftServerConstants; -import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; -import org.apache.ratis.server.impl.ServerProtoUtils; -import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.util.FileUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Test basic functionality of LogReader, LogInputStream, and LogOutputStream. - */ -public class TestRaftLogReadWrite extends BaseTest { - private File storageDir; - private long segmentMaxSize; - private long preallocatedSize; - private int bufferSize; - - @Before - public void setup() throws Exception { - storageDir = getTestDir(); - RaftProperties properties = new RaftProperties(); - RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(storageDir)); - this.segmentMaxSize = - RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); - this.preallocatedSize = - RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); - this.bufferSize = - RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); - } - - @After - public void tearDown() throws Exception { - if (storageDir != null) { - FileUtils.deleteFully(storageDir.getParentFile()); - } - } - - private LogEntryProto[] readLog(File file, long startIndex, long endIndex, - boolean isOpen) throws IOException { - List<LogEntryProto> list = new ArrayList<>(); - try (LogInputStream in = - new LogInputStream(file, startIndex, endIndex, isOpen)) { - LogEntryProto entry; - while ((entry = in.nextEntry()) != null) { - list.add(entry); - } - } - return list.toArray(new LogEntryProto[list.size()]); - } - - private long writeMessages(LogEntryProto[] entries, LogOutputStream out) - throws IOException { - long size = 0; - for (int i = 0; i < entries.length; i++) { - SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - final int s = entries[i].getSerializedSize(); - size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4; - out.write(entries[i]); - } - return size; - } - - /** - * Test basic functionality: write several log entries, then read - */ - @Test - public void testReadWriteLog() throws IOException { - final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - long size = SegmentedRaftLogFormat.getHeaderLength(); - - final LogEntryProto[] entries = new LogEntryProto[100]; - try (LogOutputStream out = - new LogOutputStream(openSegment, false, segmentMaxSize, - preallocatedSize, bufferSize)) { - size += writeMessages(entries, out); - } finally { - storage.close(); - } - - Assert.assertEquals(size, openSegment.length()); - - LogEntryProto[] readEntries = readLog(openSegment, 0, - RaftServerConstants.INVALID_LOG_INDEX, true); - Assert.assertArrayEquals(entries, readEntries); - } - - @Test - public void testAppendLog() throws IOException { - final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - LogEntryProto[] entries = new LogEntryProto[200]; - try (LogOutputStream out = - new LogOutputStream(openSegment, false, segmentMaxSize, - preallocatedSize, bufferSize)) { - for (int i = 0; i < 100; i++) { - SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - out.write(entries[i]); - } - } - - try (LogOutputStream out = - new LogOutputStream(openSegment, true, segmentMaxSize, - preallocatedSize, bufferSize)) { - for (int i = 100; i < 200; i++) { - SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - out.write(entries[i]); - } - } - - LogEntryProto[] readEntries = readLog(openSegment, 0, - RaftServerConstants.INVALID_LOG_INDEX, true); - Assert.assertArrayEquals(entries, readEntries); - - storage.close(); - } - - /** - * Simulate the scenario that the peer is shutdown without truncating - * log segment file padding. Make sure the reader can correctly handle this. - */ - @Test - public void testReadWithPadding() throws IOException { - final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - long size = SegmentedRaftLogFormat.getHeaderLength(); - - LogEntryProto[] entries = new LogEntryProto[100]; - LogOutputStream out = new LogOutputStream(openSegment, false, - segmentMaxSize, preallocatedSize, bufferSize); - size += writeMessages(entries, out); - out.flush(); - - // make sure the file contains padding - Assert.assertEquals( - RaftServerConfigKeys.Log.PREALLOCATED_SIZE_DEFAULT.getSize(), - openSegment.length()); - - // check if the reader can correctly read the log file - LogEntryProto[] readEntries = readLog(openSegment, 0, - RaftServerConstants.INVALID_LOG_INDEX, true); - Assert.assertArrayEquals(entries, readEntries); - - out.close(); - Assert.assertEquals(size, openSegment.length()); - } - - /** - * corrupt the padding by inserting non-zero bytes. Make sure the reader - * throws exception. - */ - @Test - public void testReadWithCorruptPadding() throws IOException { - final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - - LogEntryProto[] entries = new LogEntryProto[10]; - LogOutputStream out = new LogOutputStream(openSegment, false, - 16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize); - for (int i = 0; i < 10; i++) { - SimpleOperation m = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - out.write(entries[i]); - } - out.flush(); - - // make sure the file contains padding - Assert.assertEquals(4 * 1024 * 1024, openSegment.length()); - - try (FileOutputStream fout = new FileOutputStream(openSegment, true)) { - ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{-1, 1}); - fout.getChannel() - .write(byteBuffer, 16 * 1024 * 1024 - 10); - } - - List<LogEntryProto> list = new ArrayList<>(); - try (LogInputStream in = new LogInputStream(openSegment, 0, - RaftServerConstants.INVALID_LOG_INDEX, true)) { - LogEntryProto entry; - while ((entry = in.nextEntry()) != null) { - list.add(entry); - } - Assert.fail("should fail since we corrupt the padding"); - } catch (IOException e) { - boolean findVerifyTerminator = false; - for (StackTraceElement s : e.getStackTrace()) { - if (s.getMethodName().equals("verifyTerminator")) { - findVerifyTerminator = true; - break; - } - } - Assert.assertTrue(findVerifyTerminator); - } - Assert.assertArrayEquals(entries, - list.toArray(new LogEntryProto[list.size()])); - } - - /** - * Test the log reader to make sure it can detect the checksum mismatch. - */ - @Test - public void testReadWithEntryCorruption() throws IOException { - RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - File openSegment = storage.getStorageDir().getOpenLogFile(0); - try (LogOutputStream out = - new LogOutputStream(openSegment, false, segmentMaxSize, - preallocatedSize, bufferSize)) { - for (int i = 0; i < 100; i++) { - LogEntryProto entry = ServerProtoUtils.toLogEntryProto( - new SimpleOperation("m" + i).getLogEntryContent(), 0, i); - out.write(entry); - } - } finally { - storage.close(); - } - - // corrupt the log file - try (RandomAccessFile raf = new RandomAccessFile(openSegment.getCanonicalFile(), - "rw")) { - raf.seek(100); - int correctValue = raf.read(); - raf.seek(100); - raf.write(correctValue + 1); - } - - try { - readLog(openSegment, 0, RaftServerConstants.INVALID_LOG_INDEX, true); - Assert.fail("The read of corrupted log file should fail"); - } catch (ChecksumException e) { - LOG.info("Caught ChecksumException as expected", e); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java deleted file mode 100644 index 270e279..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ /dev/null @@ -1,372 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.storage; - -import org.apache.ratis.BaseTest; -import org.apache.ratis.RaftTestUtil.SimpleOperation; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; -import org.apache.ratis.server.impl.ServerProtoUtils; -import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; -import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; -import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.SizeInBytes; -import org.apache.ratis.util.TraditionalBinaryPrefix; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; -import static org.apache.ratis.server.storage.LogSegment.getEntrySize; - -/** - * Test basic functionality of {@link LogSegment} - */ -public class TestRaftLogSegment extends BaseTest { - private File storageDir; - private long segmentMaxSize; - private long preallocatedSize; - private int bufferSize; - - @Before - public void setup() throws Exception { - RaftProperties properties = new RaftProperties(); - storageDir = getTestDir(); - RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(storageDir)); - this.segmentMaxSize = - RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); - this.preallocatedSize = - RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); - this.bufferSize = - RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); - } - - @After - public void tearDown() throws Exception { - if (storageDir != null) { - FileUtils.deleteFully(storageDir.getParentFile()); - } - } - - File prepareLog(boolean isOpen, long startIndex, int numEntries, long term, boolean isLastEntryPartiallyWritten) - throws IOException { - if (!isOpen) { - Preconditions.assertTrue(!isLastEntryPartiallyWritten, "For closed log, the last entry cannot be partially written."); - } - RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - final File file = isOpen ? - storage.getStorageDir().getOpenLogFile(startIndex) : - storage.getStorageDir().getClosedLogFile(startIndex, startIndex + numEntries - 1); - - final LogEntryProto[] entries = new LogEntryProto[numEntries]; - try (LogOutputStream out = new LogOutputStream(file, false, - segmentMaxSize, preallocatedSize, bufferSize)) { - for (int i = 0; i < entries.length; i++) { - SimpleOperation op = new SimpleOperation("m" + i); - entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + startIndex); - out.write(entries[i]); - } - } - - if (isLastEntryPartiallyWritten) { - final int entrySize = size(entries[entries.length - 1]); - final int truncatedEntrySize = ThreadLocalRandom.current().nextInt(entrySize - 1) + 1; - // 0 < truncatedEntrySize < entrySize - final long fileLength = file.length(); - final long truncatedFileLength = fileLength - (entrySize - truncatedEntrySize); - LOG.info("truncate last entry: entry(size={}, truncated={}), file(length={}, truncated={})", - entrySize, truncatedEntrySize, fileLength, truncatedFileLength); - FileUtils.truncateFile(file, truncatedFileLength); - } - - storage.close(); - return file; - } - - static int size(LogEntryProto entry) { - final int n = entry.getSerializedSize(); - return CodedOutputStream.computeUInt32SizeNoTag(n) + n + 4; - } - - static void checkLogSegment(LogSegment segment, long start, long end, - boolean isOpen, long totalSize, long term) throws Exception { - Assert.assertEquals(start, segment.getStartIndex()); - Assert.assertEquals(end, segment.getEndIndex()); - Assert.assertEquals(isOpen, segment.isOpen()); - Assert.assertEquals(totalSize, segment.getTotalSize()); - - long offset = SegmentedRaftLogFormat.getHeaderLength(); - for (long i = start; i <= end; i++) { - LogSegment.LogRecord record = segment.getLogRecord(i); - LogRecordWithEntry lre = segment.getEntryWithoutLoading(i); - Assert.assertEquals(i, lre.getRecord().getTermIndex().getIndex()); - Assert.assertEquals(term, lre.getRecord().getTermIndex().getTerm()); - Assert.assertEquals(offset, record.getOffset()); - - LogEntryProto entry = lre.hasEntry() ? - lre.getEntry() : segment.loadCache(lre.getRecord()); - offset += getEntrySize(entry); - } - } - - @Test - public void testLoadLogSegment() throws Exception { - testLoadSegment(true, false); - } - - @Test - public void testLoadLogSegmentLastEntryPartiallyWritten() throws Exception { - testLoadSegment(true, true); - } - - @Test - public void testLoadCache() throws Exception { - testLoadSegment(false, false); - } - - @Test - public void testLoadCacheLastEntryPartiallyWritten() throws Exception { - testLoadSegment(false, true); - } - - private void testLoadSegment(boolean loadInitial, boolean isLastEntryPartiallyWritten) throws Exception { - // load an open segment - final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten); - RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0, - INVALID_LOG_INDEX, true, loadInitial, null); - final int delta = isLastEntryPartiallyWritten? 1: 0; - checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0); - storage.close(); - // for open segment we currently always keep log entries in the memory - Assert.assertEquals(0, openSegment.getLoadingTimes()); - - // load a closed segment (1000-1099) - final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false); - LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile, - 1000, 1099, false, loadInitial, null); - checkLogSegment(closedSegment, 1000, 1099, false, - closedSegment.getTotalSize(), 1); - Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes()); - } - - @Test - public void testAppendEntries() throws Exception { - final long start = 1000; - LogSegment segment = LogSegment.newOpenSegment(null, start); - long size = SegmentedRaftLogFormat.getHeaderLength(); - final long max = 8 * 1024 * 1024; - checkLogSegment(segment, start, start - 1, true, size, 0); - - // append till full - long term = 0; - int i = 0; - List<LogEntryProto> list = new ArrayList<>(); - while (size < max) { - SimpleOperation op = new SimpleOperation("m" + i); - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start); - size += getEntrySize(entry); - list.add(entry); - } - - segment.appendToOpenSegment(list.toArray(new LogEntryProto[list.size()])); - Assert.assertTrue(segment.getTotalSize() >= max); - checkLogSegment(segment, start, i - 1 + start, true, size, term); - } - - @Test - public void testAppendWithGap() throws Exception { - LogSegment segment = LogSegment.newOpenSegment(null, 1000); - SimpleOperation op = new SimpleOperation("m"); - final StateMachineLogEntryProto m = op.getLogEntryContent(); - try { - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1001); - segment.appendToOpenSegment(entry); - Assert.fail("should fail since the entry's index needs to be 1000"); - } catch (IllegalStateException e) { - // the exception is expected. - } - - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1000); - segment.appendToOpenSegment(entry); - - try { - entry = ServerProtoUtils.toLogEntryProto(m, 0, 1002); - segment.appendToOpenSegment(entry); - Assert.fail("should fail since the entry's index needs to be 1001"); - } catch (IllegalStateException e) { - // the exception is expected. - } - - LogEntryProto[] entries = new LogEntryProto[2]; - for (int i = 0; i < 2; i++) { - entries[i] = ServerProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2); - } - try { - segment.appendToOpenSegment(entries); - Assert.fail("should fail since there is gap between entries"); - } catch (IllegalStateException e) { - // the exception is expected. - } - } - - @Test - public void testTruncate() throws Exception { - final long term = 1; - final long start = 1000; - LogSegment segment = LogSegment.newOpenSegment(null, start); - for (int i = 0; i < 100; i++) { - LogEntryProto entry = ServerProtoUtils.toLogEntryProto( - new SimpleOperation("m" + i).getLogEntryContent(), term, i + start); - segment.appendToOpenSegment(entry); - } - - // truncate an open segment (remove 1080~1099) - long newSize = segment.getLogRecord(start + 80).getOffset(); - segment.truncate(start + 80); - Assert.assertEquals(80, segment.numOfEntries()); - checkLogSegment(segment, start, start + 79, false, newSize, term); - - // truncate a closed segment (remove 1050~1079) - newSize = segment.getLogRecord(start + 50).getOffset(); - segment.truncate(start + 50); - Assert.assertEquals(50, segment.numOfEntries()); - checkLogSegment(segment, start, start + 49, false, newSize, term); - - // truncate all the remaining entries - segment.truncate(start); - Assert.assertEquals(0, segment.numOfEntries()); - checkLogSegment(segment, start, start - 1, false, - SegmentedRaftLogFormat.getHeaderLength(), term); - } - - @Test - public void testPreallocateSegment() throws Exception { - RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - final File file = storage.getStorageDir().getOpenLogFile(0); - final int[] maxSizes = new int[]{1024, 1025, 1024 * 1024 - 1, 1024 * 1024, - 1024 * 1024 + 1, 2 * 1024 * 1024 - 1, 2 * 1024 * 1024, - 2 * 1024 * 1024 + 1, 8 * 1024 * 1024}; - final int[] preallocated = new int[]{512, 1024, 1025, 1024 * 1024, - 1024 * 1024 + 1, 2 * 1024 * 1024}; - - // make sure preallocation is correct with different max/pre-allocated size - for (int max : maxSizes) { - for (int a : preallocated) { - try (LogOutputStream ignored = - new LogOutputStream(file, false, max, a, bufferSize)) { - Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a)); - } - try (LogInputStream in = - new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) { - LogEntryProto entry = in.nextEntry(); - Assert.assertNull(entry); - } - } - } - - // test the scenario where an entry's size is larger than the max size - final byte[] content = new byte[1024 * 2]; - Arrays.fill(content, (byte) 1); - final long size; - try (LogOutputStream out = new LogOutputStream(file, false, - 1024, 1024, bufferSize)) { - SimpleOperation op = new SimpleOperation(new String(content)); - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0); - size = LogSegment.getEntrySize(entry); - out.write(entry); - } - Assert.assertEquals(file.length(), - size + SegmentedRaftLogFormat.getHeaderLength()); - try (LogInputStream in = new LogInputStream(file, 0, - INVALID_LOG_INDEX, true)) { - LogEntryProto entry = in.nextEntry(); - Assert.assertArrayEquals(content, - entry.getStateMachineLogEntry().getLogData().toByteArray()); - Assert.assertNull(in.nextEntry()); - } - } - - /** - * Keep appending and check if pre-allocation is correct - */ - @Test - public void testPreallocationAndAppend() throws Exception { - final SizeInBytes max = SizeInBytes.valueOf(2, TraditionalBinaryPrefix.MEGA); - RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - final File file = storage.getStorageDir().getOpenLogFile(0); - - final byte[] content = new byte[1024]; - Arrays.fill(content, (byte) 1); - SimpleOperation op = new SimpleOperation(new String(content)); - LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0); - final long entrySize = LogSegment.getEntrySize(entry); - - long totalSize = SegmentedRaftLogFormat.getHeaderLength(); - long preallocated = 16 * 1024; - try (LogOutputStream out = new LogOutputStream(file, false, - max.getSize(), 16 * 1024, 10 * 1024)) { - Assert.assertEquals(preallocated, file.length()); - while (totalSize + entrySize < max.getSize()) { - totalSize += entrySize; - out.write(entry); - if (totalSize > preallocated) { - Assert.assertEquals("totalSize==" + totalSize, - preallocated + 16 * 1024, file.length()); - preallocated += 16 * 1024; - } - } - } - - Assert.assertEquals(totalSize, file.length()); - } - - @Test - public void testZeroSizeInProgressFile() throws Exception { - final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - final File file = storage.getStorageDir().getOpenLogFile(0); - storage.close(); - - // create zero size in-progress file - LOG.info("file: " + file); - Assert.assertTrue(file.createNewFile()); - final Path path = file.toPath(); - Assert.assertTrue(Files.exists(path)); - Assert.assertEquals(0, Files.size(path)); - - // getLogSegmentFiles should remove it. - final List<RaftStorageDirectory.LogPathAndIndex> logs = storage.getStorageDir().getLogSegmentFiles(); - Assert.assertEquals(0, logs.size()); - Assert.assertFalse(Files.exists(path)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java deleted file mode 100644 index 4a26f8c..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.storage; - -import org.apache.ratis.BaseTest; -import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; -import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; -import org.apache.ratis.util.FileUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.internal.util.reflection.Whitebox; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.ThreadLocalRandom; - -/** - * Test RaftStorage and RaftStorageDirectory - */ -public class TestRaftStorage extends BaseTest { - private File storageDir; - - @Before - public void setup() throws Exception { - storageDir = getTestDir(); - } - - @After - public void tearDown() throws Exception { - if (storageDir != null) { - FileUtils.deleteFully(storageDir.getParentFile()); - } - } - - @Test - public void testNotExistent() throws IOException { - FileUtils.deleteFully(storageDir); - - // we will format the empty directory - RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - - try { - new RaftStorage(storageDir, StartupOption.FORMAT).close(); - Assert.fail("the format should fail since the storage is still locked"); - } catch (IOException e) { - Assert.assertTrue(e.getMessage().contains("directory is already locked")); - } - - storage.close(); - FileUtils.deleteFully(storageDir); - Assert.assertTrue(storageDir.createNewFile()); - try { - new RaftStorage(storageDir, StartupOption.REGULAR); - Assert.fail(); - } catch (IOException e) { - Assert.assertTrue( - e.getMessage().contains(StorageState.NON_EXISTENT.name())); - } - } - - /** - * make sure the RaftStorage format works - */ - @Test - public void testStorage() throws Exception { - RaftStorageDirectory sd = new RaftStorageDirectory(storageDir); - try { - StorageState state = sd.analyzeStorage(true); - Assert.assertEquals(StorageState.NOT_FORMATTED, state); - Assert.assertTrue(sd.isCurrentEmpty()); - } finally { - sd.unlock(); - } - - RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - storage.close(); - - Assert.assertEquals(StorageState.NORMAL, sd.analyzeStorage(false)); - File m = sd.getMetaFile(); - Assert.assertTrue(m.exists()); - MetaFile metaFile = new MetaFile(m); - Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); - Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); - - metaFile.set(123, "peer1"); - metaFile.readFile(); - Assert.assertEquals(123, metaFile.getTerm()); - Assert.assertEquals("peer1", metaFile.getVotedFor()); - - MetaFile metaFile2 = new MetaFile(m); - Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded")); - Assert.assertEquals(123, metaFile.getTerm()); - Assert.assertEquals("peer1", metaFile.getVotedFor()); - - // test format - storage = new RaftStorage(storageDir, StartupOption.FORMAT); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - metaFile = new MetaFile(sd.getMetaFile()); - Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); - Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); - storage.close(); - } - - @Test - public void testMetaFile() throws Exception { - RaftStorage storage = new RaftStorage(storageDir, StartupOption.FORMAT); - File m = storage.getStorageDir().getMetaFile(); - Assert.assertTrue(m.exists()); - MetaFile metaFile = new MetaFile(m); - Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); - Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); - - metaFile.set(123, "peer1"); - metaFile.readFile(); - Assert.assertEquals(123, metaFile.getTerm()); - Assert.assertEquals("peer1", metaFile.getVotedFor()); - - MetaFile metaFile2 = new MetaFile(m); - Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded")); - Assert.assertEquals(123, metaFile.getTerm()); - Assert.assertEquals("peer1", metaFile.getVotedFor()); - - storage.close(); - } - - /** - * check if RaftStorage deletes tmp metafile when startup - */ - @Test - public void testCleanMetaTmpFile() throws Exception { - RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - storage.close(); - - RaftStorageDirectory sd = new RaftStorageDirectory(storageDir); - File metaFile = sd.getMetaFile(); - FileUtils.move(metaFile, sd.getMetaTmpFile()); - - Assert.assertEquals(StorageState.NOT_FORMATTED, sd.analyzeStorage(false)); - - try { - new RaftStorage(storageDir, StartupOption.REGULAR); - Assert.fail("should throw IOException since storage dir is not formatted"); - } catch (IOException e) { - Assert.assertTrue( - e.getMessage().contains(StorageState.NOT_FORMATTED.name())); - } - - // let the storage dir contain both raft-meta and raft-meta.tmp - new RaftStorage(storageDir, StartupOption.FORMAT).close(); - Assert.assertTrue(sd.getMetaFile().exists()); - Assert.assertTrue(sd.getMetaTmpFile().createNewFile()); - Assert.assertTrue(sd.getMetaTmpFile().exists()); - try { - storage = new RaftStorage(storageDir, StartupOption.REGULAR); - Assert.assertEquals(StorageState.NORMAL, storage.getState()); - Assert.assertFalse(sd.getMetaTmpFile().exists()); - Assert.assertTrue(sd.getMetaFile().exists()); - } finally { - storage.close(); - } - } - - @Test - public void testSnapshotFileName() throws Exception { - final long term = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); - final long index = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); - final String name = SimpleStateMachineStorage.getSnapshotFileName(term, index); - System.out.println("name = " + name); - final File file = new File(storageDir, name); - final TermIndex ti = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(file); - System.out.println("file = " + file); - Assert.assertEquals(term, ti.getTerm()); - Assert.assertEquals(index, ti.getIndex()); - System.out.println("ti = " + ti); - - final File foo = new File(storageDir, "foo"); - try { - SimpleStateMachineStorage.getTermIndexFromSnapshotFile(foo); - Assert.fail(); - } catch(IllegalArgumentException iae) { - System.out.println("Good " + iae); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java deleted file mode 100644 index bcbfa73..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ /dev/null @@ -1,524 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.storage; - -import org.apache.log4j.Level; -import org.apache.ratis.BaseTest; -import org.apache.ratis.RaftTestUtil.SimpleOperation; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.TimeoutIOException; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.RaftServerConstants; -import org.apache.ratis.server.impl.RetryCacheTestUtil; -import org.apache.ratis.server.impl.RetryCache; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.ServerProtoUtils; -import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.statemachine.SimpleStateMachine4Testing; -import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.statemachine.impl.BaseStateMachine; -import org.apache.ratis.util.ExitUtils; -import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.SizeInBytes; -import org.apache.ratis.util.TimeDuration; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestSegmentedRaftLog extends BaseTest { - static { - LogUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG); - } - - private static final RaftPeerId peerId = RaftPeerId.valueOf("s0"); - - static class SegmentRange { - final long start; - final long end; - final long term; - final boolean isOpen; - - SegmentRange(long s, long e, long term, boolean isOpen) { - this.start = s; - this.end = e; - this.term = term; - this.isOpen = isOpen; - } - } - - private File storageDir; - private RaftProperties properties; - private RaftStorage storage; - private long segmentMaxSize; - private long preallocatedSize; - private int bufferSize; - - @Before - public void setup() throws Exception { - storageDir = getTestDir(); - properties = new RaftProperties(); - RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(storageDir)); - storage = new RaftStorage(storageDir, RaftServerConstants.StartupOption.REGULAR); - this.segmentMaxSize = - RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); - this.preallocatedSize = - RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); - this.bufferSize = - RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); - } - - @After - public void tearDown() throws Exception { - if (storageDir != null) { - FileUtils.deleteFully(storageDir.getParentFile()); - } - } - - private LogEntryProto[] prepareLog(List<SegmentRange> list) throws IOException { - List<LogEntryProto> entryList = new ArrayList<>(); - for (SegmentRange range : list) { - File file = range.isOpen ? - storage.getStorageDir().getOpenLogFile(range.start) : - storage.getStorageDir().getClosedLogFile(range.start, range.end); - - final int size = (int) (range.end - range.start + 1); - LogEntryProto[] entries = new LogEntryProto[size]; - try (LogOutputStream out = new LogOutputStream(file, false, - segmentMaxSize, preallocatedSize, bufferSize)) { - for (int i = 0; i < size; i++) { - SimpleOperation m = new SimpleOperation("m" + (i + range.start)); - entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, i + range.start); - out.write(entries[i]); - } - } - Collections.addAll(entryList, entries); - } - return entryList.toArray(new LogEntryProto[entryList.size()]); - } - - static List<SegmentRange> prepareRanges(int startTerm, int endTerm, int segmentSize, - long startIndex) { - List<SegmentRange> list = new ArrayList<>(endTerm - startTerm); - for (int i = startTerm; i < endTerm; i++) { - list.add(new SegmentRange(startIndex, startIndex + segmentSize - 1, i, - i == endTerm - 1)); - startIndex += segmentSize; - } - return list; - } - - private LogEntryProto getLastEntry(SegmentedRaftLog raftLog) - throws IOException { - return raftLog.get(raftLog.getLastEntryTermIndex().getIndex()); - } - - @Test - public void testLoadLogSegments() throws Exception { - // first generate log files - List<SegmentRange> ranges = prepareRanges(0, 5, 100, 0); - LogEntryProto[] entries = prepareLog(ranges); - - // create RaftLog object and load log file - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - // check if log entries are loaded correctly - for (LogEntryProto e : entries) { - LogEntryProto entry = raftLog.get(e.getIndex()); - Assert.assertEquals(e, entry); - } - - TermIndex[] termIndices = raftLog.getEntries(0, 500); - LogEntryProto[] entriesFromLog = Arrays.stream(termIndices) - .map(ti -> { - try { - return raftLog.get(ti.getIndex()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .toArray(LogEntryProto[]::new); - Assert.assertArrayEquals(entries, entriesFromLog); - Assert.assertEquals(entries[entries.length - 1], getLastEntry(raftLog)); - } - } - - static List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist, - Supplier<String> stringSupplier) { - List<LogEntryProto> eList = new ArrayList<>(); - for (SegmentRange range : slist) { - prepareLogEntries(range, stringSupplier, false, eList); - } - return eList; - } - - static List<LogEntryProto> prepareLogEntries(SegmentRange range, - Supplier<String> stringSupplier, boolean hasStataMachineData, List<LogEntryProto> eList) { - for(long index = range.start; index <= range.end; index++) { - eList.add(prepareLogEntry(range.term, index, stringSupplier, hasStataMachineData)); - } - return eList; - } - - static LogEntryProto prepareLogEntry(long term, long index, Supplier<String> stringSupplier, boolean hasStataMachineData) { - final SimpleOperation m = stringSupplier == null? - new SimpleOperation("m" + index, hasStataMachineData): - new SimpleOperation(stringSupplier.get(), hasStataMachineData); - return ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), term, index); - } - - /** - * Append entry one by one and check if log state is correct. - */ - @Test - public void testAppendEntry() throws Exception { - List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0); - List<LogEntryProto> entries = prepareLogEntries(ranges, null); - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - // append entries to the raftlog - entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); - } - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - // check if the raft log is correct - checkEntries(raftLog, entries, 0, entries.size()); - } - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - TermIndex lastTermIndex = raftLog.getLastEntryTermIndex(); - IllegalStateException ex = null; - try { - // append entry fails if append entry term is lower than log's last entry term - raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0)) - .setTerm(lastTermIndex.getTerm() - 1) - .setIndex(lastTermIndex.getIndex() + 1).build()); - } catch (IllegalStateException e) { - ex = e; - } - Assert.assertTrue(ex.getMessage().contains("term less than RaftLog's last term")); - try { - // append entry fails if difference between append entry index and log's last entry index is greater than 1 - raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0)) - .setTerm(lastTermIndex.getTerm()) - .setIndex(lastTermIndex.getIndex() + 2).build()); - } catch (IllegalStateException e) { - ex = e; - } - Assert.assertTrue(ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex() + " greater than 1")); - } - } - - /** - * Keep appending entries, make sure the rolling is correct. - */ - @Test - public void testAppendAndRoll() throws Exception { - RaftServerConfigKeys.Log.setPreallocatedSize(properties, SizeInBytes.valueOf("16KB")); - RaftServerConfigKeys.Log.setSegmentSizeMax(properties, SizeInBytes.valueOf("128KB")); - - List<SegmentRange> ranges = prepareRanges(0, 1, 1024, 0); - final byte[] content = new byte[1024]; - List<LogEntryProto> entries = prepareLogEntries(ranges, - () -> new String(content)); - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - // append entries to the raftlog - entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); - } - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - // check if the raft log is correct - checkEntries(raftLog, entries, 0, entries.size()); - Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments()); - } - } - - @Test - public void testTruncate() throws Exception { - // prepare the log for truncation - List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0); - List<LogEntryProto> entries = prepareLogEntries(ranges, null); - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - // append entries to the raftlog - entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); - } - - for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) { - testTruncate(entries, fromIndex); - } - } - - private void testTruncate(List<LogEntryProto> entries, long fromIndex) - throws Exception { - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - // truncate the log - raftLog.truncate(fromIndex).join(); - - - checkEntries(raftLog, entries, 0, (int) fromIndex); - } - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - // check if the raft log is correct - if (fromIndex > 0) { - Assert.assertEquals(entries.get((int) (fromIndex - 1)), - getLastEntry(raftLog)); - } else { - Assert.assertNull(raftLog.getLastEntryTermIndex()); - } - checkEntries(raftLog, entries, 0, (int) fromIndex); - } - } - - private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected, - int offset, int size) throws IOException { - if (size > 0) { - for (int i = offset; i < size + offset; i++) { - LogEntryProto entry = raftLog.get(expected.get(i).getIndex()); - Assert.assertEquals(expected.get(i), entry); - } - TermIndex[] termIndices = raftLog.getEntries( - expected.get(offset).getIndex(), - expected.get(offset + size - 1).getIndex() + 1); - LogEntryProto[] entriesFromLog = Arrays.stream(termIndices) - .map(ti -> { - try { - return raftLog.get(ti.getIndex()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .toArray(LogEntryProto[]::new); - LogEntryProto[] expectedArray = expected.subList(offset, offset + size) - .stream().toArray(LogEntryProto[]::new); - Assert.assertArrayEquals(expectedArray, entriesFromLog); - } - } - - private void checkFailedEntries(List<LogEntryProto> entries, long fromIndex, RetryCache retryCache) { - for (int i = 0; i < entries.size(); i++) { - if (i < fromIndex) { - RetryCacheTestUtil.assertFailure(retryCache, entries.get(i), false); - } else { - RetryCacheTestUtil.assertFailure(retryCache, entries.get(i), true); - } - } - } - - /** - * Test append with inconsistent entries - */ - @Test - public void testAppendEntriesWithInconsistency() throws Exception { - // prepare the log for truncation - List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0); - List<LogEntryProto> entries = prepareLogEntries(ranges, null); - - RaftServerImpl server = mock(RaftServerImpl.class); - RetryCache retryCache = RetryCacheTestUtil.createRetryCache(); - when(server.getRetryCache()).thenReturn(retryCache); - doCallRealMethod().when(server).failClientRequest(any(LogEntryProto.class)); - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, server, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - entries.stream().forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry)); - // append entries to the raftlog - entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); - } - - // append entries whose first 100 entries are the same with existing log, - // and the next 100 are with different term - SegmentRange r1 = new SegmentRange(550, 599, 2, false); - SegmentRange r2 = new SegmentRange(600, 649, 3, false); - SegmentRange r3 = new SegmentRange(650, 749, 10, false); - List<LogEntryProto> newEntries = prepareLogEntries( - Arrays.asList(r1, r2, r3), null); - - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, server, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])).forEach(CompletableFuture::join); - - checkFailedEntries(entries, 650, retryCache); - checkEntries(raftLog, entries, 0, 650); - checkEntries(raftLog, newEntries, 100, 100); - Assert.assertEquals(newEntries.get(newEntries.size() - 1), - getLastEntry(raftLog)); - Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), - raftLog.getLatestFlushedIndex()); - } - - // load the raftlog again and check - try (SegmentedRaftLog raftLog = - new SegmentedRaftLog(peerId, server, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - checkEntries(raftLog, entries, 0, 650); - checkEntries(raftLog, newEntries, 100, 100); - Assert.assertEquals(newEntries.get(newEntries.size() - 1), - getLastEntry(raftLog)); - Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), - raftLog.getLatestFlushedIndex()); - - RaftLogCache cache = raftLog.getRaftLogCache(); - Assert.assertEquals(5, cache.getNumOfSegments()); - } - } - - @Test - public void testSegmentedRaftLogStateMachineData() throws Exception { - final SegmentRange range = new SegmentRange(0, 10, 1, true); - final List<LogEntryProto> entries = prepareLogEntries(range, null, true, new ArrayList<>()); - - final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing(); - try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - - int next = 0; - long flush = -1; - assertIndices(raftLog, flush, next); - raftLog.appendEntry(entries.get(next++)); - assertIndices(raftLog, flush, next); - raftLog.appendEntry(entries.get(next++)); - assertIndices(raftLog, flush, next); - raftLog.appendEntry(entries.get(next++)); - assertIndicesMultipleAttempts(raftLog, flush += 3, next); - - sm.blockFlushStateMachineData(); - raftLog.appendEntry(entries.get(next++)); - { - sm.blockWriteStateMachineData(); - final Thread t = startAppendEntryThread(raftLog, entries.get(next++)); - TimeUnit.SECONDS.sleep(1); - Assert.assertTrue(t.isAlive()); - sm.unblockWriteStateMachineData(); - t.join(); - } - assertIndices(raftLog, flush, next); - TimeUnit.SECONDS.sleep(1); - assertIndices(raftLog, flush, next); - sm.unblockFlushStateMachineData(); - assertIndicesMultipleAttempts(raftLog, flush + 2, next); - } - } - - @Test - public void testSegmentedRaftLogStateMachineDataTimeoutIOException() throws Exception { - RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true); - final TimeDuration syncTimeout = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); - RaftServerConfigKeys.Log.StateMachineData.setSyncTimeout(properties, syncTimeout); - final int numRetries = 2; - RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties, numRetries); - ExitUtils.disableSystemExit(); - - final LogEntryProto entry = prepareLogEntry(0, 0, null, true); - final StateMachine sm = new BaseStateMachine() { - @Override - public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) { - return new CompletableFuture<>(); // the future never completes - } - }; - - try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, null, storage, -1, properties)) { - raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - raftLog.appendEntry(entry); // RaftLogWorker should catch TimeoutIOException - - JavaUtils.attempt(() -> { - final ExitUtils.ExitException exitException = ExitUtils.getFirstExitException(); - Objects.requireNonNull(exitException, "exitException == null"); - Assert.assertEquals(TimeoutIOException.class, exitException.getCause().getClass()); - }, 3*numRetries, syncTimeout, "RaftLogWorker should catch TimeoutIOException and exit", LOG); - } - } - - static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) { - final Thread t = new Thread(() -> raftLog.appendEntry(entry)); - t.start(); - return t; - } - - void assertIndices(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) { - LOG.info("assert expectedFlushIndex={}", expectedFlushIndex); - Assert.assertEquals(expectedFlushIndex, raftLog.getLatestFlushedIndex()); - LOG.info("assert expectedNextIndex={}", expectedNextIndex); - Assert.assertEquals(expectedNextIndex, raftLog.getNextIndex()); - } - - void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) throws Exception { - JavaUtils.attempt(() -> assertIndices(raftLog, expectedFlushIndex, expectedNextIndex), - 10, 100, "assertIndices", LOG); - } - - @Test - public void testSegmentedRaftLogFormatInternalHeader() throws Exception { - testFailureCase("testSegmentedRaftLogFormatInternalHeader", - () -> SegmentedRaftLogFormat.applyHeaderTo(header -> { - LOG.info("header = " + new String(header, StandardCharsets.UTF_8)); - header[0] += 1; // try changing the internal header - LOG.info("header' = " + new String(header, StandardCharsets.UTF_8)); - return null; - }), IllegalStateException.class); - - // reset the header - SegmentedRaftLogFormat.applyHeaderTo(header -> { - LOG.info("header' = " + new String(header, StandardCharsets.UTF_8)); - header[0] -= 1; // try changing the internal header - LOG.info("header'' = " + new String(header, StandardCharsets.UTF_8)); - return null; - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java deleted file mode 100644 index a4dc88a..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.statemachine; - -import org.apache.log4j.Level; -import org.apache.ratis.BaseTest; -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerProxy; -import org.apache.ratis.server.impl.RaftServerTestUtil; -import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; -import org.apache.ratis.util.LogUtils; -import org.junit.*; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import static org.junit.Assert.*; - -/** - * Test StateMachine related functionality - */ -public class TestStateMachine extends BaseTest implements MiniRaftClusterWithSimulatedRpc.FactoryGet { - static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - public static final int NUM_SERVERS = 3; - - static class SMTransactionContext extends SimpleStateMachine4Testing { - public static SMTransactionContext get(RaftServerImpl s) { - return (SMTransactionContext)s.getStateMachine(); - } - - AtomicReference<Throwable> throwable = new AtomicReference<>(null); - AtomicLong transactions = new AtomicLong(0); - AtomicBoolean isLeader = new AtomicBoolean(false); - AtomicLong numApplied = new AtomicLong(0); - ConcurrentLinkedQueue<Long> applied = new ConcurrentLinkedQueue<>(); - - @Override - public TransactionContext startTransaction(RaftClientRequest request) { - // only leader will get this call - isLeader.set(true); - // send the next transaction id as the "context" from SM - return TransactionContext.newBuilder() - .setStateMachine(this) - .setClientRequest(request) - .setStateMachineContext(transactions.incrementAndGet()) - .build(); - } - - @Override - public CompletableFuture<Message> applyTransaction(TransactionContext trx) { - try { - assertNotNull(trx.getLogEntry()); - assertNotNull(trx.getStateMachineLogEntry()); - Object context = trx.getStateMachineContext(); - if (isLeader.get()) { - assertNotNull(trx.getClientRequest()); - assertNotNull(context); - assertTrue(context instanceof Long); - Long val = (Long)context; - assertTrue(val <= transactions.get()); - applied.add(val); - } else { - assertNull(trx.getClientRequest()); - assertNull(context); - } - numApplied.incrementAndGet(); - } catch (Throwable t) { - throwable.set(t); - } - return CompletableFuture.completedFuture(null); - } - - void rethrowIfException() throws Throwable { - Throwable t = throwable.get(); - if (t != null) { - throw t; - } - } - } - - @Test - public void testTransactionContextIsPassedBack() throws Throwable { - runTestTransactionContextIsPassedBack(false); - } - - @Test - public void testTransactionContextIsPassedBackUseMemory() throws Throwable { - runTestTransactionContextIsPassedBack(true); - } - - void runTestTransactionContextIsPassedBack(boolean useMemory) throws Throwable { - final RaftProperties properties = new RaftProperties(); - properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SMTransactionContext.class, StateMachine.class); - RaftServerConfigKeys.Log.setUseMemory(properties, useMemory); - - try(MiniRaftClusterWithSimulatedRpc cluster = getFactory().newCluster(NUM_SERVERS, properties)) { - cluster.start(); - runTestTransactionContextIsPassedBack(cluster); - } - } - - static void runTestTransactionContextIsPassedBack(MiniRaftCluster cluster) throws Throwable { - // tests that the TrxContext set by the StateMachine in Leader is passed back to the SM - int numTrx = 100; - final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx); - try(final RaftClient client = cluster.createClient()) { - for (RaftTestUtil.SimpleMessage message : messages) { - client.send(message); - } - } - - // TODO: there eshould be a better way to ensure all data is replicated and applied - Thread.sleep(cluster.getMaxTimeout() + 100); - - for (RaftServerImpl raftServer : cluster.iterateServerImpls()) { - final SMTransactionContext sm = SMTransactionContext.get(raftServer); - sm.rethrowIfException(); - assertEquals(numTrx, sm.numApplied.get()); - } - - // check leader - RaftServerImpl raftServer = cluster.getLeader(); - // assert every transaction has obtained context in leader - final SMTransactionContext sm = SMTransactionContext.get(raftServer); - List<Long> ll = sm.applied.stream().collect(Collectors.toList()); - Collections.sort(ll); - assertEquals(ll.toString(), ll.size(), numTrx); - for (int i=0; i < numTrx; i++) { - assertEquals(ll.toString(), Long.valueOf(i+1), ll.get(i)); - } - } - - @Test - public void testStateMachineRegistry() throws Throwable { - final Map<RaftGroupId, StateMachine> registry = new ConcurrentHashMap<>(); - registry.put(RaftGroupId.randomId(), new SimpleStateMachine4Testing()); - registry.put(RaftGroupId.randomId(), new SMTransactionContext()); - - try(MiniRaftClusterWithSimulatedRpc cluster = newCluster(0)) { - cluster.setStateMachineRegistry(registry::get); - - final RaftPeerId id = RaftPeerId.valueOf("s0"); - cluster.putNewServer(id, null, true); - cluster.start(); - - for(RaftGroupId gid : registry.keySet()) { - final RaftGroup newGroup = RaftGroup.valueOf(gid, cluster.getPeers()); - LOG.info("add new group: " + newGroup); - final RaftClient client = cluster.createClient(newGroup); - for(RaftPeer p : newGroup.getPeers()) { - client.groupAdd(newGroup, p.getId()); - } - } - - final RaftServerProxy proxy = cluster.getServer(id); - for(Map.Entry<RaftGroupId, StateMachine> e: registry.entrySet()) { - final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(proxy, e.getKey()); - Assert.assertSame(e.getValue(), impl.getStateMachine()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-server/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/resources/log4j.properties b/ratis-server/src/test/resources/log4j.properties deleted file mode 100644 index ced0687..0000000 --- a/ratis-server/src/test/resources/log4j.properties +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# log4j configuration used during build and unit tests - -log4j.rootLogger=info,stdout -log4j.threshold=ALL -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-test/pom.xml b/ratis-test/pom.xml new file mode 100644 index 0000000..9a70ff9 --- /dev/null +++ b/ratis-test/pom.xml @@ -0,0 +1,92 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>ratis</artifactId> + <groupId>org.apache.ratis</groupId> + <version>0.3.0-SNAPSHOT</version> + </parent> + + <artifactId>ratis-test</artifactId> + <name>Apache Ratis Test</name> + + <dependencies> + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <artifactId>ratis-client</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <artifactId>ratis-netty</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-netty</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <artifactId>ratis-grpc</artifactId> + <groupId>org.apache.ratis</groupId> + </dependency> + <dependency> + <artifactId>ratis-grpc</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java new file mode 100644 index 0000000..55bcdfc --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.TimeDuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Test Raft Server Leader election timeout detection and notification to state machine. + */ +public class TestRaftServerLeaderElectionTimeout extends BaseTest { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + public static final int NUM_SERVERS = 3; + + protected static final RaftProperties properties = new RaftProperties(); + + private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc + .FACTORY.newCluster(NUM_SERVERS, getProperties()); + + public RaftProperties getProperties() { + RaftServerConfigKeys + .setLeaderElectionTimeout(properties, TimeDuration.valueOf(1, TimeUnit.SECONDS)); + properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + return properties; + } + + @Before + public void setup() throws IOException { + Assert.assertNull(cluster.getLeader()); + cluster.start(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testLeaderElectionDetection() throws Exception { + RaftTestUtil.waitForLeader(cluster); + long leaderElectionTimeout = RaftServerConfigKeys. + leaderElectionTimeout(cluster.getProperties()).toInt(TimeUnit.MILLISECONDS); + + RaftServerImpl healthyFollower = cluster.getFollowers().get(1); + RaftServerImpl failedFollower = cluster.getFollowers().get(0); + // fail the leader and one of the followers to that quorum is not present + // for next leader election to succeed. + cluster.killServer(failedFollower.getId()); + cluster.killServer(cluster.getLeader().getId()); + + // Wait to ensure that leader election is triggered and also state machine callback is triggered + Thread.sleep( leaderElectionTimeout * 2); + + RaftProtos.RoleInfoProto roleInfoProto = + SimpleStateMachine4Testing.get(healthyFollower).getLeaderElectionTimeoutInfo(); + Assert.assertNotNull(roleInfoProto); + + Assert.assertEquals(roleInfoProto.getRole(), RaftProtos.RaftPeerRole.CANDIDATE); + Assert.assertTrue(roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() > leaderElectionTimeout); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java new file mode 100644 index 0000000..96a164e --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.TimeDuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Test Raft Server Slowness detection and notification to Leader's statemachine. + */ +//TODO: fix StateMachine.notifySlowness(..); see RATIS-370 +@Ignore +public class TestRaftServerSlownessDetection extends BaseTest { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + } + + public static final int NUM_SERVERS = 3; + + protected static final RaftProperties properties = new RaftProperties(); + + private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc + .FACTORY.newCluster(NUM_SERVERS, getProperties()); + + public RaftProperties getProperties() { + RaftServerConfigKeys.Rpc + .setSlownessTimeout(properties, TimeDuration.valueOf(1, TimeUnit.SECONDS)); + properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + return properties; + } + + @Before + public void setup() throws IOException { + Assert.assertNull(cluster.getLeader()); + cluster.start(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testSlownessDetection() throws Exception { + RaftTestUtil.waitForLeader(cluster); + long slownessTimeout = RaftServerConfigKeys.Rpc + .slownessTimeout(cluster.getProperties()).toInt(TimeUnit.MILLISECONDS); + RaftServerImpl failedFollower = cluster.getFollowers().get(0); + + // fail the node and wait for the callback to be triggered + cluster.killServer(failedFollower.getId()); + Thread.sleep( slownessTimeout * 2); + + // Followers should not get any failed not notification + for (RaftServerImpl followerServer : cluster.getFollowers()) { + Assert.assertNull(SimpleStateMachine4Testing.get(followerServer).getSlownessInfo()); + } + // the leader should get notification that the follower has failed now + RaftProtos.RoleInfoProto roleInfoProto = + SimpleStateMachine4Testing.get(cluster.getLeader()).getSlownessInfo(); + Assert.assertNotNull(roleInfoProto); + + List<RaftProtos.ServerRpcProto> followers = + roleInfoProto.getLeaderInfo().getFollowerInfoList(); + //Assert that the node shutdown is lagging behind + for (RaftProtos.ServerRpcProto serverProto : followers) { + if (RaftPeerId.valueOf(serverProto.getId().getId()).equals(failedFollower.getId())) { + Assert.assertTrue(serverProto.getLastRpcElapsedTimeMs() > slownessTimeout); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java new file mode 100644 index 0000000..d481003 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.util.TimeDuration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + + + +public class TestRetryPolicy { + + @Test + public void testRetryMultipleTimesWithFixedSleep() { + RetryPolicy retryPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(2, + TimeDuration.valueOf(1000L, TimeUnit.MILLISECONDS)); + boolean shouldRetry = retryPolicy.shouldRetry(1); + Assert.assertTrue(shouldRetry); + Assert.assertTrue(1000 == retryPolicy.getSleepTime().getDuration()); + Assert.assertFalse(retryPolicy.shouldRetry(2)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java new file mode 100644 index 0000000..657bfd1 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.GroupManagementBaseTest; + +public class TestGroupManagementWithGrpc extends GroupManagementBaseTest { + @Override + public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { + return MiniRaftClusterWithGrpc.FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java new file mode 100644 index 0000000..eb08336 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.server.impl.BlockRequestHandlingInjection; +import org.apache.ratis.server.impl.LeaderElectionTests; +import org.junit.Test; + +public class TestLeaderElectionWithGrpc + extends LeaderElectionTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { + + @Override + @Test + public void testEnforceLeader() throws Exception { + super.testEnforceLeader(); + + MiniRaftClusterWithGrpc.sendServerRequestInjection.clear(); + BlockRequestHandlingInjection.getInstance().unblockAll(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java new file mode 100644 index 0000000..614787e --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.RaftAsyncTests; + +public class TestRaftAsyncWithGrpc extends RaftAsyncTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java new file mode 100644 index 0000000..d2b71bc --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.RaftExceptionBaseTest; + +public class TestRaftExceptionWithGrpc + extends RaftExceptionBaseTest<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { +}
