http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java new file mode 100644 index 0000000..9b2932c --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.io.Charsets; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.impl.ConfigurationManager; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.CodeInjectionForTesting; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * The RaftLog implementation that writes log entries into segmented files in + * local disk. + * + * The max log segment size is 8MB. The real log segment size may not be + * exactly equal to this limit. If a log entry's size exceeds 8MB, this entry + * will be stored in a single segment. + * + * There are two types of segments: closed segment and open segment. The former + * is named as "log_startindex-endindex", the later is named as + * "log_inprogress_startindex". + * + * There can be multiple closed segments but there is at most one open segment. + * When the open segment reaches the size limit, or the log term increases, we + * close the open segment and start a new open segment. A closed segment cannot + * be appended anymore, but it can be truncated in case that a follower's log is + * inconsistent with the current leader. + * + * Every closed segment should be non-empty, i.e., it should contain at least + * one entry. + * + * There should not be any gap between segments. The first segment may not start + * from index 0 since there may be snapshots as log compaction. The last index + * in segments should be no smaller than the last index of snapshot, otherwise + * we may have hole when append further log. + */ +public class SegmentedRaftLog extends RaftLog { + static final String HEADER_STR = "RAFTLOG1"; + static final byte[] HEADER_BYTES = HEADER_STR.getBytes(Charsets.UTF_8); + + /** + * I/O task definitions. + */ + static abstract class Task { + private boolean done = false; + + synchronized void done() { + done = true; + notifyAll(); + } + + synchronized void waitForDone() throws InterruptedException { + while (!done) { + wait(); + } + } + + abstract void execute() throws IOException; + + abstract long getEndIndex(); + + @Override + public String toString() { + return getClass().getSimpleName() + "-" + getEndIndex(); + } + } + private static final ThreadLocal<Task> myTask = new ThreadLocal<>(); + + private final RaftStorage storage; + private final RaftLogCache cache; + private final RaftLogWorker fileLogWorker; + private final long segmentMaxSize; + + public SegmentedRaftLog(String selfId, RaftServerImpl server, RaftStorage storage, + long lastIndexInSnapshot, RaftProperties properties) throws IOException { + super(selfId); + this.storage = storage; + this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, + RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT); + cache = new RaftLogCache(); + fileLogWorker = new RaftLogWorker(server, storage, properties); + lastCommitted.set(lastIndexInSnapshot); + } + + @Override + public void open(ConfigurationManager confManager, long lastIndexInSnapshot) + throws IOException { + loadLogSegments(confManager, lastIndexInSnapshot); + File openSegmentFile = null; + if (cache.getOpenSegment() != null) { + openSegmentFile = storage.getStorageDir() + .getOpenLogFile(cache.getOpenSegment().getStartIndex()); + } + fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot), + openSegmentFile); + super.open(confManager, lastIndexInSnapshot); + } + + @Override + public long getStartIndex() { + return cache.getStartIndex(); + } + + private void loadLogSegments(ConfigurationManager confManager, + long lastIndexInSnapshot) throws IOException { + try(AutoCloseableLock writeLock = writeLock()) { + List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles(); + for (LogPathAndIndex pi : paths) { + LogSegment logSegment = parseLogSegment(pi, confManager); + cache.addSegment(logSegment); + } + + // if the largest index is smaller than the last index in snapshot, we do + // not load the log to avoid holes between log segments. This may happen + // when the local I/O worker is too slow to persist log (slower than + // committing the log and taking snapshot) + if (!cache.isEmpty() && cache.getEndIndex() < lastIndexInSnapshot) { + LOG.warn("End log index {} is smaller than last index in snapshot {}", + cache.getEndIndex(), lastIndexInSnapshot); + cache.clear(); + // TODO purge all segment files + } + } + } + + private LogSegment parseLogSegment(LogPathAndIndex pi, + ConfigurationManager confManager) throws IOException { + final boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX; + return LogSegment.loadSegment(pi.path.toFile(), pi.startIndex, pi.endIndex, + isOpen, confManager); + } + + @Override + public LogEntryProto get(long index) { + checkLogState(); + try(AutoCloseableLock readLock = readLock()) { + return cache.getEntry(index); + } + } + + @Override + public LogEntryProto[] getEntries(long startIndex, long endIndex) { + checkLogState(); + try(AutoCloseableLock readLock = readLock()) { + return cache.getEntries(startIndex, endIndex); + } + } + + @Override + public LogEntryProto getLastEntry() { + checkLogState(); + try(AutoCloseableLock readLock = readLock()) { + return cache.getLastEntry(); + } + } + + /** + * The method, along with {@link #appendEntry} and + * {@link #append(LogEntryProto...)} need protection of RaftServer's lock. + */ + @Override + void truncate(long index) { + checkLogState(); + try(AutoCloseableLock writeLock = writeLock()) { + RaftLogCache.TruncationSegments ts = cache.truncate(index); + if (ts != null) { + Task task = fileLogWorker.truncate(ts); + myTask.set(task); + } + } + } + + @Override + void appendEntry(LogEntryProto entry) { + checkLogState(); + try(AutoCloseableLock writeLock = writeLock()) { + final LogSegment currentOpenSegment = cache.getOpenSegment(); + if (currentOpenSegment == null) { + cache.addSegment(LogSegment.newOpenSegment(entry.getIndex())); + fileLogWorker.startLogSegment(getNextIndex()); + } else if (isSegmentFull(currentOpenSegment, entry)) { + cache.rollOpenSegment(true); + fileLogWorker.rollLogSegment(currentOpenSegment); + } else if (currentOpenSegment.numOfEntries() > 0 && + currentOpenSegment.getLastRecord().entry.getTerm() != entry.getTerm()) { + // the term changes + final long currentTerm = currentOpenSegment.getLastRecord().entry + .getTerm(); + Preconditions.checkState(currentTerm < entry.getTerm(), + "open segment's term %s is larger than the new entry's term %s", + currentTerm, entry.getTerm()); + cache.rollOpenSegment(true); + fileLogWorker.rollLogSegment(currentOpenSegment); + } + + cache.appendEntry(entry); + myTask.set(fileLogWorker.writeLogEntry(entry)); + } + } + + private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) { + if (segment.getTotalSize() >= segmentMaxSize) { + return true; + } else { + final long entrySize = LogSegment.getEntrySize(entry); + // if entry size is greater than the max segment size, write it directly + // into the current segment + return entrySize <= segmentMaxSize && + segment.getTotalSize() + entrySize > segmentMaxSize; + } + } + + @Override + public void append(LogEntryProto... entries) { + checkLogState(); + if (entries == null || entries.length == 0) { + return; + } + try(AutoCloseableLock writeLock = writeLock()) { + Iterator<LogEntryProto> iter = cache.iterator(entries[0].getIndex()); + int index = 0; + long truncateIndex = -1; + for (; iter.hasNext() && index < entries.length; index++) { + LogEntryProto storedEntry = iter.next(); + Preconditions.checkState( + storedEntry.getIndex() == entries[index].getIndex(), + "The stored entry's index %s is not consistent with" + + " the received entries[%s]'s index %s", storedEntry.getIndex(), + index, entries[index].getIndex()); + + if (storedEntry.getTerm() != entries[index].getTerm()) { + // we should truncate from the storedEntry's index + truncateIndex = storedEntry.getIndex(); + break; + } + } + if (truncateIndex != -1) { + // truncate from truncateIndex + truncate(truncateIndex); + } + // append from entries[index] + for (int i = index; i < entries.length; i++) { + appendEntry(entries[i]); + } + } + } + + @Override + public void logSync() throws InterruptedException { + CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null); + final Task task = myTask.get(); + if (task != null) { + task.waitForDone(); + } + } + + @Override + public long getLatestFlushedIndex() { + return fileLogWorker.getFlushedIndex(); + } + + /** + * {@inheritDoc} + * + * This operation is protected by the RaftServer's lock + */ + @Override + public void writeMetadata(long term, String votedFor) throws IOException { + storage.getMetaFile().set(term, votedFor); + } + + @Override + public Metadata loadMetadata() throws IOException { + return new Metadata(storage.getMetaFile().getVotedFor(), + storage.getMetaFile().getTerm()); + } + + @Override + public void syncWithSnapshot(long lastSnapshotIndex) { + fileLogWorker.syncWithSnapshot(lastSnapshotIndex); + // TODO purge log files and normal/tmp/corrupt snapshot files + // if the last index in snapshot is larger than the index of the last + // log entry, we should delete all the log entries and their cache to avoid + // gaps between log segments. + } + + @Override + public void close() throws IOException { + super.close(); + fileLogWorker.close(); + storage.close(); + } + + @VisibleForTesting + RaftLogCache getRaftLogCache() { + return cache; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java new file mode 100644 index 0000000..73b2af9 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; + +import org.apache.ratis.io.MD5Hash; +import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.MD5FileUtil; +import org.apache.ratis.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Manage snapshots of a raft peer. + * TODO: snapshot should be treated as compaction log thus can be merged into + * RaftLog. In this way we can have a unified getLastTermIndex interface. + */ +public class SnapshotManager { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class); + + private final RaftStorage storage; + private final String selfId; + + public SnapshotManager(RaftStorage storage, String selfId) + throws IOException { + this.storage = storage; + this.selfId = selfId; + } + + public void installSnapshot(StateMachine stateMachine, + InstallSnapshotRequestProto request) throws IOException { + final long lastIncludedIndex = request.getTermIndex().getIndex(); + final RaftStorageDirectory dir = storage.getStorageDir(); + + File tmpDir = dir.getNewTempDir(); + tmpDir.mkdirs(); + tmpDir.deleteOnExit(); + + LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir); + + // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order, + // and are not lost when whole request cycle is done. Check requestId and requestIndex here + + for (FileChunkProto chunk : request.getFileChunksList()) { + SnapshotInfo pi = stateMachine.getLatestSnapshot(); + if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) { + throw new IOException("There exists snapshot file " + + pi.getFiles() + " in " + selfId + + " with endIndex >= lastIncludedIndex " + lastIncludedIndex); + } + + String fileName = chunk.getFilename(); // this is relative to the root dir + // TODO: assumes flat layout inside SM dir + File tmpSnapshotFile = new File(tmpDir, + new File(dir.getRoot(), fileName).getName()); + + FileOutputStream out = null; + try { + // if offset is 0, delete any existing temp snapshot file if it has the + // same last index. + if (chunk.getOffset() == 0) { + if (tmpSnapshotFile.exists()) { + FileUtils.fullyDelete(tmpSnapshotFile); + } + // create the temp snapshot file and put padding inside + out = new FileOutputStream(tmpSnapshotFile); + } else { + Preconditions.checkState(tmpSnapshotFile.exists()); + out = new FileOutputStream(tmpSnapshotFile, true); + FileChannel fc = out.getChannel(); + fc.position(chunk.getOffset()); + } + + // write data to the file + out.write(chunk.getData().toByteArray()); + } finally { + RaftUtils.cleanup(null, out); + } + + // rename the temp snapshot file if this is the last chunk. also verify + // the md5 digest and create the md5 meta-file. + if (chunk.getDone()) { + final MD5Hash expectedDigest = + new MD5Hash(chunk.getFileDigest().toByteArray()); + // calculate the checksum of the snapshot file and compare it with the + // file digest in the request + MD5Hash digest = MD5FileUtil.computeMd5ForFile(tmpSnapshotFile); + if (!digest.equals(expectedDigest)) { + LOG.warn("The snapshot md5 digest {} does not match expected {}", + digest, expectedDigest); + // rename the temp snapshot file to .corrupt +// NativeIO.renameTo(tmpSnapshotFile, // TODO: +// dir.getCorruptSnapshotFile(lastIncludedTerm, lastIncludedIndex)); + throw new IOException("MD5 mismatch for snapshot-" + lastIncludedIndex + + " installation"); + } else { + MD5FileUtil.saveMD5File(tmpSnapshotFile, digest); + } + } + } + + if (request.getDone()) { + LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}", + tmpDir, dir.getStateMachineDir()); + dir.getStateMachineDir().delete(); + tmpDir.renameTo(dir.getStateMachineDir()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java new file mode 100644 index 0000000..397a12b --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.statemachine; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.util.LifeCycle; + +/** + * Base implementation for StateMachines. + */ +public class BaseStateMachine implements StateMachine { + + protected RaftProperties properties; + protected RaftStorage storage; + protected RaftConfiguration raftConf; + protected final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); + + @Override + public LifeCycle.State getLifeCycleState() { + return lifeCycle.getCurrentState(); + } + + @Override + public void initialize(String id, RaftProperties properties, RaftStorage storage) + throws IOException { + lifeCycle.setName(getClass().getSimpleName() + ":" + id); + this.properties = properties; + this.storage = storage; + } + + @Override + public void setRaftConfiguration(RaftConfiguration conf) { + this.raftConf = conf; + } + + @Override + public RaftConfiguration getRaftConfiguration() { + return this.raftConf; + } + + @Override + public SnapshotInfo getLatestSnapshot() { + return getStateMachineStorage().getLatestSnapshot(); + } + + @Override + public void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException { + // do nothing + } + + @Override + public void pause() { + } + + @Override + public void reinitialize(String id, RaftProperties properties, RaftStorage storage) + throws IOException { + } + + @Override + public TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException { + return trx; + } + + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException { + // return the same message contained in the entry + Message msg = () -> trx.getLogEntry().get().getSmLogEntry().getData(); + return CompletableFuture.completedFuture(msg); + } + + @Override + public long takeSnapshot() throws IOException { + return RaftServerConstants.INVALID_LOG_INDEX; + } + + @Override + public StateMachineStorage getStateMachineStorage() { + return new StateMachineStorage() { + @Override + public void init(RaftStorage raftStorage) throws IOException { + } + + @Override + public SnapshotInfo getLatestSnapshot() { + return null; + } + + @Override + public void format() throws IOException { + } + }; + } + + @Override + public CompletableFuture<RaftClientReply> query( + RaftClientRequest request) { + return null; + } + + @Override + public TransactionContext startTransaction(RaftClientRequest request) + throws IOException { + return new TransactionContext(this, request, + SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .build()); + } + + @Override + public TransactionContext cancelTransaction(TransactionContext trx) throws IOException { + return trx; + } + + @Override + public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException { + return trx; + } + + @Override + public void close() throws IOException { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java new file mode 100644 index 0000000..1858603 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/FileListSnapshotInfo.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import java.util.Collections; +import java.util.List; + +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.FileInfo; + +/** + * Each snapshot has a list of files. + * + * The objects of this class are immutable. + */ +public class FileListSnapshotInfo implements SnapshotInfo { + private final TermIndex termIndex; + private final List<FileInfo> files; + + public FileListSnapshotInfo(List<FileInfo> files, long term, long index) { + this.termIndex = TermIndex.newTermIndex(term, index); + this.files = Collections.unmodifiableList(files); + } + + @Override + public TermIndex getTermIndex() { + return termIndex; + } + + @Override + public long getTerm() { + return termIndex.getTerm(); + } + + @Override + public long getIndex() { + return termIndex.getIndex(); + } + + @Override + public List<FileInfo> getFiles() { + return files; + } + + @Override + public String toString() { + return termIndex + ":" + files; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java new file mode 100644 index 0000000..d417db7 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.ratis.io.MD5Hash; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.util.AtomicFileOutputStream; +import org.apache.ratis.util.MD5FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A StateMachineStorage that stores the snapshot in a single file. + */ +public class SimpleStateMachineStorage implements StateMachineStorage { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleStateMachineStorage.class); + + static final String SNAPSHOT_FILE_PREFIX = "snapshot"; + static final String CORRUPT_SNAPSHOT_FILE_SUFFIX = ".corrupt"; + /** snapshot.term_index */ + static final Pattern SNAPSHOT_REGEX = + Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)"); + + private RaftStorage raftStorage; + private File smDir = null; + + private volatile SingleFileSnapshotInfo currentSnapshot = null; + + public void init(RaftStorage raftStorage) throws IOException { + this.raftStorage = raftStorage; + this.smDir = raftStorage.getStorageDir().getStateMachineDir(); + loadLatestSnapshot(); + } + + @Override + public void format() throws IOException { + // TODO + } + + @VisibleForTesting + public static TermIndex getTermIndexFromSnapshotFile(File file) { + final String name = file.getName(); + final Matcher m = SNAPSHOT_REGEX.matcher(name); + if (!m.matches()) { + throw new IllegalArgumentException("File \"" + file + + "\" does not match snapshot file name pattern \"" + + SNAPSHOT_REGEX + "\""); + } + final long term = Long.parseLong(m.group(1)); + final long index = Long.parseLong(m.group(2)); + return TermIndex.newTermIndex(term, index); + } + + protected static String getTmpSnapshotFileName(long term, long endIndex) { + return getSnapshotFileName(term, endIndex) + AtomicFileOutputStream.TMP_EXTENSION; + } + + protected static String getCorruptSnapshotFileName(long term, long endIndex) { + return getSnapshotFileName(term, endIndex) + CORRUPT_SNAPSHOT_FILE_SUFFIX; + } + + public File getSnapshotFile(long term, long endIndex) { + return new File(smDir, getSnapshotFileName(term, endIndex)); + } + + protected File getTmpSnapshotFile(long term, long endIndex) { + return new File(smDir, getTmpSnapshotFileName(term, endIndex)); + } + + protected File getCorruptSnapshotFile(long term, long endIndex) { + return new File(smDir, getCorruptSnapshotFileName(term, endIndex)); + } + + public SingleFileSnapshotInfo findLatestSnapshot() throws IOException { + SingleFileSnapshotInfo latest = null; + try (DirectoryStream<Path> stream = + Files.newDirectoryStream(smDir.toPath())) { + for (Path path : stream) { + Matcher matcher = SNAPSHOT_REGEX.matcher(path.getFileName().toString()); + if (matcher.matches()) { + final long endIndex = Long.parseLong(matcher.group(2)); + if (latest == null || endIndex > latest.getIndex()) { + final long term = Long.parseLong(matcher.group(1)); + MD5Hash fileDigest = MD5FileUtil.readStoredMd5ForFile(path.toFile()); + final FileInfo fileInfo = new FileInfo(path, fileDigest); + latest = new SingleFileSnapshotInfo(fileInfo, term, endIndex); + } + } + } + } + return latest; + } + + public void loadLatestSnapshot() throws IOException { + this.currentSnapshot = findLatestSnapshot(); + } + + public static String getSnapshotFileName(long term, long endIndex) { + return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex; + } + + @Override + public SingleFileSnapshotInfo getLatestSnapshot() { + return currentSnapshot; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java new file mode 100644 index 0000000..5bca2c9 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SingleFileSnapshotInfo.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import java.util.Arrays; + +import org.apache.ratis.server.storage.FileInfo; + +/** + * Each snapshot only has a single file. + * + * The objects of this class are immutable. + */ +public class SingleFileSnapshotInfo extends FileListSnapshotInfo { + public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) { + super(Arrays.asList(fileInfo), term, endIndex); + } + + /** @return the file associated with the snapshot. */ + public FileInfo getFile() { + return getFiles().get(0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java new file mode 100644 index 0000000..f0aadd9 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import java.util.List; + +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.FileInfo; + +/** + * SnapshotInfo represents a durable state by the state machine. The state machine implementation is + * responsible for the layout of the snapshot files as well as making the data durable. Latest term, + * latest index, and the raft configuration must be saved together with any data files in the + * snapshot. + */ +public interface SnapshotInfo { + + /** + * Returns the term and index corresponding to this snapshot. + * @return The term and index corresponding to this snapshot. + */ + TermIndex getTermIndex(); + + /** + * Returns the term corresponding to this snapshot. + * @return The term corresponding to this snapshot. + */ + long getTerm(); + + /** + * Returns the index corresponding to this snapshot. + * @return The index corresponding to this snapshot. + */ + long getIndex(); + + /** + * Returns a list of files corresponding to this snapshot. This list should include all + * the files that the state machine keeps in its data directory. This list of files will be + * copied as to other replicas in install snapshot RPCs. + * @return a list of Files corresponding to the this snapshot. + */ + List<FileInfo> getFiles(); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java new file mode 100644 index 0000000..e377aa7 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.util.LifeCycle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * StateMachine is the entry point for the custom implementation of replicated state as defined in + * the "State Machine Approach" in the literature + * (see https://en.wikipedia.org/wiki/State_machine_replication). + */ +public interface StateMachine extends Closeable { + /** + * Initializes the State Machine with the given properties and storage. The state machine is + * responsible reading the latest snapshot from the file system (if any) and initialize itself + * with the latest term and index there including all the edits. + */ + void initialize(String id, RaftProperties properties, RaftStorage storage) + throws IOException; + + /** + * Returns the lifecycle state for this StateMachine. + * @return the lifecycle state. + */ + LifeCycle.State getLifeCycleState(); + + /** + * Pauses the state machine. On return, the state machine should have closed all open files so + * that a new snapshot can be installed. + */ + void pause(); + + /** + * Re-initializes the State Machine in PAUSED state with the given properties and storage. The + * state machine is responsible reading the latest snapshot from the file system (if any) and + * initialize itself with the latest term and index there including all the edits. + */ + void reinitialize(String id, RaftProperties properties, RaftStorage storage) + throws IOException; + + /** + * Dump the in-memory state into a snapshot file in the RaftStorage. The + * StateMachine implementation can decide 1) its own snapshot format, 2) when + * a snapshot is taken, and 3) how the snapshot is taken (e.g., whether the + * snapshot blocks the state machine, and whether to purge log entries after + * a snapshot is done). + * + * In the meanwhile, when the size of raft log outside of the latest snapshot + * exceeds certain threshold, the RaftServer may choose to trigger a snapshot + * if {@link RaftServerConfigKeys#RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY} is + * enabled. + * + * The snapshot should include the latest raft configuration. + * + * @return the largest index of the log entry that has been applied to the + * state machine and also included in the snapshot. Note the log purge + * should be handled separately. + */ + // TODO: refactor this + long takeSnapshot() throws IOException; + + /** + * Record the RaftConfiguration in the state machine. The RaftConfiguration + * should also be stored in the snapshot. + */ + void setRaftConfiguration(RaftConfiguration conf); + + /** + * @return the latest raft configuration recorded in the state machine. + */ + RaftConfiguration getRaftConfiguration(); + + /** + * @return StateMachineStorage to interact with the durability guarantees provided by the + * state machine. + */ + StateMachineStorage getStateMachineStorage(); + + /** + * Returns the information for the latest durable snapshot. + */ + SnapshotInfo getLatestSnapshot(); + + /** + * Query the state machine. The request must be read-only. + * TODO: extend RaftClientRequest to have a read-only request subclass. + */ + CompletableFuture<RaftClientReply> query(RaftClientRequest request); + + /** + * Validate/pre-process the incoming update request in the state machine. + * @return the content to be written to the log entry. Null means the request + * should be rejected. + * @throws IOException thrown by the state machine while validation + */ + TransactionContext startTransaction(RaftClientRequest request) + throws IOException; + + /** + * This is called before the transaction passed from the StateMachine is appended to the raft log. + * This method will be called from log append and having the same strict serial order that the + * transactions will have in the RAFT log. Since this is called serially in the critical path of + * log append, it is important to do only required operations here. + * @return The Transaction context. + */ + TransactionContext preAppendTransaction(TransactionContext trx) throws IOException; + + /** + * Called to notify the state machine that the Transaction passed cannot be appended (or synced). + * The exception field will indicate whether there was an exception or not. + * @param trx the transaction to cancel + * @return cancelled transaction + */ + TransactionContext cancelTransaction(TransactionContext trx) throws IOException; + + /** + * Called for transactions that have been committed to the RAFT log. This step is called + * sequentially in strict serial order that the transactions have been committed in the log. + * The SM is expected to do only necessary work, and leave the actual apply operation to the + * applyTransaction calls that can happen concurrently. + * @param trx the transaction state including the log entry that has been committed to a quorum + * of the raft peers + * @return The Transaction context. + */ + TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException; + + /** + * Apply a committed log entry to the state machine. This method can be called concurrently with + * the other calls, and there is no guarantee that the calls will be ordered according to the + * log commit order. + * @param trx the transaction state including the log entry that has been committed to a quorum + * of the raft peers + */ + // TODO: We do not need to return CompletableFuture + CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException; + + /** + * Notify the state machine that the raft peer is no longer leader. + */ + void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java new file mode 100644 index 0000000..4f7951a --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import java.io.IOException; + +import org.apache.ratis.server.storage.RaftStorage; + +public interface StateMachineStorage { + + void init(RaftStorage raftStorage) throws IOException; + + /** + * Returns the information for the latest durable snapshot. + */ + SnapshotInfo getLatestSnapshot(); + + // TODO: StateMachine can decide to compact the files independently of concurrent install snapshot + // etc requests. We should have ref counting for the SnapshotInfo with a release mechanism + // so that raft server will release the files after the snapshot file copy in case a compaction + // is waiting for deleting these files. + + void format() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java new file mode 100644 index 0000000..81bea45 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import java.io.IOException; +import java.util.Collection; +import java.util.Optional; + +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; + +/** + * Context for a transaction. + * The transaction might have originated from a client request, or it + * maybe coming from another replica of the state machine through the RAFT log. + * {@link TransactionContext} can be created from + * either the {@link StateMachine} or the state machine updater. + * + * In the first case, the {@link StateMachine} is a leader. When it receives + * a {@link StateMachine#startTransaction(RaftClientRequest)} request, it returns + * a {@link TransactionContext} with the changes from the {@link StateMachine}. + * The same context will be passed back to the {@link StateMachine} + * via the {@link StateMachine#applyTransaction(TransactionContext)} call + * or the {@link StateMachine#notifyNotLeader(Collection)} call. + * + * In the second case, the {@link StateMachine} is a follower. + * The {@link TransactionContext} will be a committed entry coming from + * the RAFT log from the leader. + */ +public class TransactionContext { + + /** The {@link StateMachine} that originated the transaction. */ + private final StateMachine stateMachine; + + /** Original request from the client */ + private Optional<RaftClientRequest> clientRequest = Optional.empty(); + + /** Exception from the {@link StateMachine} or from the log */ + private Optional<Exception> exception = Optional.empty(); + + /** Data from the {@link StateMachine} */ + private Optional<SMLogEntryProto> smLogEntryProto = Optional.empty(); + + /** + * Context specific to the state machine. + * The {@link StateMachine} can use this object to carry state between + * {@link StateMachine#startTransaction(RaftClientRequest)} and + * {@link StateMachine#applyTransaction(TransactionContext)}. + */ + private Optional<Object> stateMachineContext = Optional.empty(); + + /** + * Whether to commit the transaction to the RAFT Log. + * In some cases the {@link StateMachine} may want to indicate + * that the transaction should not be committed + */ + private boolean shouldCommit = true; + + /** Committed LogEntry. */ + private Optional<LogEntryProto> logEntry = Optional.empty(); + + private TransactionContext(StateMachine stateMachine) { + this.stateMachine = stateMachine; + } + + /** The same as this(stateMachine, clientRequest, smLogEntryProto, null). */ + public TransactionContext( + StateMachine stateMachine, RaftClientRequest clientRequest, + SMLogEntryProto smLogEntryProto) { + this(stateMachine, clientRequest, smLogEntryProto, null); + } + + /** + * Construct a {@link TransactionContext} from a client request. + * Used by the state machine to start a transaction + * and send the Log entry representing the transaction data + * to be applied to the raft log. + */ + public TransactionContext( + StateMachine stateMachine, RaftClientRequest clientRequest, + SMLogEntryProto smLogEntryProto, Object stateMachineContext) { + this(stateMachine); + this.clientRequest = Optional.of(clientRequest); + this.smLogEntryProto = Optional.ofNullable(smLogEntryProto); + this.stateMachineContext = Optional.ofNullable(stateMachineContext); + } + + /** The same as this(stateMachine, clientRequest, exception, null). */ + public TransactionContext( + StateMachine stateMachine, RaftClientRequest clientRequest, + Exception exception) { + this(stateMachine, clientRequest, exception, null); + } + + /** + * Construct a {@link TransactionContext} from a client request to signal + * an exception so that the RAFT server will fail the request on behalf + * of the {@link StateMachine}. + */ + public TransactionContext( + StateMachine stateMachine, RaftClientRequest clientRequest, + Exception exception, Object stateMachineContext) { + this(stateMachine); + this.clientRequest = Optional.of(clientRequest); + this.exception = Optional.of(exception); + this.stateMachineContext = Optional.ofNullable(stateMachineContext); + } + + /** + * Construct a {@link TransactionContext} from a {@link LogEntryProto}. + * Used by followers for applying committed entries to the state machine. + * @param logEntry the log entry to be applied + */ + public TransactionContext(StateMachine stateMachine, LogEntryProto logEntry) { + this(stateMachine); + this.smLogEntryProto = Optional.of(logEntry.getSmLogEntry()); + this.logEntry = Optional.of(logEntry); + } + + public Optional<RaftClientRequest> getClientRequest() { + return this.clientRequest; + } + + public Optional<SMLogEntryProto> getSMLogEntry() { + return this.smLogEntryProto; + } + + public Optional<Exception> getException() { + return this.exception; + } + + public TransactionContext setStateMachineContext(Object stateMachineContext) { + this.stateMachineContext = Optional.ofNullable(stateMachineContext); + return this; + } + + public Optional<Object> getStateMachineContext() { + return stateMachineContext; + } + + public TransactionContext setLogEntry(LogEntryProto logEntry) { + this.logEntry = Optional.of(logEntry); + return this; + } + + public TransactionContext setSmLogEntryProto(SMLogEntryProto smLogEntryProto) { + this.smLogEntryProto = Optional.of(smLogEntryProto); + return this; + } + + public Optional<LogEntryProto> getLogEntry() { + return logEntry; + } + + private TransactionContext setException(IOException ioe) { + assert !this.exception.isPresent(); + this.exception = Optional.of(ioe); + return this; + } + + public TransactionContext setShouldCommit(boolean shouldCommit) { + this.shouldCommit = shouldCommit; + return this; + } + + public boolean shouldCommit() { + // TODO: Hook this up in the server to bypass the RAFT Log and send back a response to client + return this.shouldCommit; + } + + // proxy StateMachine methods. We do not want to expose the SM to the RaftLog + + /** + * This is called before the transaction passed from the StateMachine is appended to the raft log. + * This method will be called from log append and having the same strict serial order that the + * Transactions will have in the RAFT log. Since this is called serially in the critical path of + * log append, it is important to do only required operations here. + * @return The Transaction context. + */ + public TransactionContext preAppendTransaction() throws IOException { + return stateMachine.preAppendTransaction(this); + } + + /** + * Called to notify the state machine that the Transaction passed cannot be appended (or synced). + * The exception field will indicate whether there was an exception or not. + * @return cancelled transaction + */ + public TransactionContext cancelTransaction() throws IOException { + // TODO: This is not called from Raft server / log yet. When an IOException happens, we should + // call this to let the SM know that Transaction cannot be synced + return stateMachine.cancelTransaction(this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java new file mode 100644 index 0000000..60cbb9c --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -0,0 +1,445 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.client.impl.RaftClientImpl; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.impl.DelayLocalExecutionInjection; +import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.storage.MemoryRaftLog; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.statemachine.BaseStateMachine; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.RaftUtils; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +public abstract class MiniRaftCluster { + public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); + public static final DelayLocalExecutionInjection logSyncDelay = + new DelayLocalExecutionInjection(RaftLog.LOG_SYNC); + + public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName(); + public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class"; + public static final Class<? extends StateMachine> STATEMACHINE_CLASS_DEFAULT = BaseStateMachine.class; + + public static abstract class Factory<CLUSTER extends MiniRaftCluster> { + public abstract CLUSTER newCluster( + String[] ids, RaftProperties prop, boolean formatted) + throws IOException; + + public CLUSTER newCluster( + int numServer, RaftProperties prop, boolean formatted) + throws IOException { + return newCluster(generateIds(numServer, 0), prop, formatted); + } + } + + public static abstract class RpcBase extends MiniRaftCluster { + public RpcBase(String[] ids, RaftProperties properties, boolean formatted) { + super(ids, properties, formatted); + } + + protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException; + + @Override + protected void setPeerRpc() throws IOException { + for (RaftPeer p : conf.getPeers()) { + setPeerRpc(p); + } + } + + @Override + public void restartServer(String id, boolean format) throws IOException { + super.restartServer(id, format); + setPeerRpc(conf.getPeer(id)).start(); + } + + @Override + public void setBlockRequestsFrom(String src, boolean block) { + RaftTestUtil.setBlockRequestsFrom(src, block); + } + } + + public static class PeerChanges { + public final RaftPeer[] allPeersInNewConf; + public final RaftPeer[] newPeers; + public final RaftPeer[] removedPeers; + + public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) { + this.allPeersInNewConf = all; + this.newPeers = newPeers; + this.removedPeers = removed; + } + } + + public static RaftConfiguration initConfiguration(String[] ids) { + return RaftConfiguration.newBuilder() + .setConf(Arrays.stream(ids).map(RaftPeer::new).collect(Collectors.toList())) + .build(); + } + + private static String getBaseDirectory() { + return System.getProperty("test.build.data", "target/test/data") + "/raft/"; + } + + private static void formatDir(String dirStr) { + final File serverDir = new File(dirStr); + Preconditions.checkState(FileUtils.fullyDelete(serverDir), + "Failed to format directory %s", dirStr); + LOG.info("Formatted directory {}", dirStr); + } + + public static String[] generateIds(int numServers, int base) { + String[] ids = new String[numServers]; + for (int i = 0; i < numServers; i++) { + ids[i] = "s" + (i + base); + } + return ids; + } + + protected RaftConfiguration conf; + protected final RaftProperties properties; + private final String testBaseDir; + protected final Map<String, RaftServerImpl> servers = + Collections.synchronizedMap(new LinkedHashMap<>()); + + public MiniRaftCluster(String[] ids, RaftProperties properties, + boolean formatted) { + this.conf = initConfiguration(ids); + this.properties = new RaftProperties(properties); + this.testBaseDir = getBaseDirectory(); + + conf.getPeers().forEach( + p -> servers.put(p.getId(), newRaftServer(p.getId(), conf, formatted))); + + ExitUtils.disableSystemExit(); + } + + protected <RPC extends RaftServerRpc> void init(Map<RaftPeer, RPC> peers) { + LOG.info("peers = " + peers.keySet()); + conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build(); + for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) { + final RaftServerImpl server = servers.get(entry.getKey().getId()); + server.setInitialConf(conf); + server.setServerRpc(entry.getValue()); + } + } + + public void start() { + LOG.info("Starting " + getClass().getSimpleName()); + servers.values().forEach(RaftServerImpl::start); + } + + /** + * start a stopped server again. + */ + public void restartServer(String id, boolean format) throws IOException { + killServer(id); + servers.remove(id); + servers.put(id, newRaftServer(id, conf, format)); + } + + public final void restart(boolean format) throws IOException { + servers.values().stream().filter(RaftServerImpl::isAlive) + .forEach(RaftServerImpl::close); + List<String> idList = new ArrayList<>(servers.keySet()); + for (String id : idList) { + servers.remove(id); + servers.put(id, newRaftServer(id, conf, format)); + } + + setPeerRpc(); + start(); + } + + protected abstract void setPeerRpc() throws IOException; + + public int getMaxTimeout() { + return properties.getInt( + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); + } + + public RaftConfiguration getConf() { + return conf; + } + + private RaftServerImpl newRaftServer(String id, RaftConfiguration conf, + boolean format) { + final RaftServerImpl s; + try { + final String dirStr = testBaseDir + id; + if (format) { + formatDir(dirStr); + } + properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr); + s = new RaftServerImpl(id, conf, properties, getStateMachine4Test(properties)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return s; + } + + static StateMachine getStateMachine4Test(RaftProperties properties) { + final Class<? extends StateMachine> smClass = properties.getClass( + STATEMACHINE_CLASS_KEY, + STATEMACHINE_CLASS_DEFAULT, + StateMachine.class); + return RaftUtils.newInstance(smClass); + } + + public abstract RaftClientRequestSender getRaftClientRequestSender(); + + protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers( + Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers, + boolean startService) throws IOException { + for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) { + RaftServerImpl server = servers.get(entry.getKey().getId()); + server.setServerRpc(entry.getValue()); + } + if (startService) { + newServers.forEach(RaftServerImpl::start); + } + return new ArrayList<>(newPeers.keySet()); + } + + protected abstract Collection<RaftPeer> addNewPeers( + Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers, + boolean startService) throws IOException; + + public PeerChanges addNewPeers(int number, boolean startNewPeer) + throws IOException { + return addNewPeers(generateIds(number, servers.size()), startNewPeer); + } + + public PeerChanges addNewPeers(String[] ids, + boolean startNewPeer) throws IOException { + LOG.info("Add new peers {}", Arrays.asList(ids)); + Collection<RaftPeer> newPeers = new ArrayList<>(ids.length); + for (String id : ids) { + newPeers.add(new RaftPeer(id)); + } + + // create and add new RaftServers + final List<RaftServerImpl> newServers = new ArrayList<>(ids.length); + for (RaftPeer p : newPeers) { + RaftServerImpl newServer = newRaftServer(p.getId(), conf, true); + Preconditions.checkArgument(!servers.containsKey(p.getId())); + servers.put(p.getId(), newServer); + newServers.add(newServer); + } + + // for hadoop-rpc-enabled peer, we assign inetsocketaddress here + newPeers = addNewPeers(newPeers, newServers, startNewPeer); + + final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]); + newPeers.addAll(conf.getPeers()); + conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build(); + RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]); + return new PeerChanges(p, np, new RaftPeer[0]); + } + + public void startServer(String id) { + RaftServerImpl server = servers.get(id); + assert server != null; + server.start(); + } + + private RaftPeer getPeer(RaftServerImpl s) { + return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()); + } + + /** + * prepare the peer list when removing some peers from the conf + */ + public PeerChanges removePeers(int number, boolean removeLeader, + Collection<RaftPeer> excluded) { + Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers()); + List<RaftPeer> removedPeers = new ArrayList<>(number); + if (removeLeader) { + final RaftPeer leader = getPeer(getLeader()); + assert !excluded.contains(leader); + peers.remove(leader); + removedPeers.add(leader); + } + List<RaftServerImpl> followers = getFollowers(); + for (int i = 0, removed = 0; i < followers.size() && + removed < (removeLeader ? number - 1 : number); i++) { + RaftPeer toRemove = getPeer(followers.get(i)); + if (!excluded.contains(toRemove)) { + peers.remove(toRemove); + removedPeers.add(toRemove); + removed++; + } + } + conf = RaftConfiguration.newBuilder().setConf(peers).setLogEntryIndex(0).build(); + RaftPeer[] p = peers.toArray(new RaftPeer[peers.size()]); + return new PeerChanges(p, new RaftPeer[0], + removedPeers.toArray(new RaftPeer[removedPeers.size()])); + } + + public void killServer(String id) { + servers.get(id).close(); + } + + public String printServers() { + StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n"); + for (RaftServerImpl s : servers.values()) { + b.append(" "); + b.append(s).append("\n"); + } + return b.toString(); + } + + public String printAllLogs() { + StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n"); + for (RaftServerImpl s : servers.values()) { + b.append(" "); + b.append(s).append("\n"); + + final RaftLog log = s.getState().getLog(); + if (log instanceof MemoryRaftLog) { + b.append(" "); + b.append(((MemoryRaftLog) log).getEntryString()); + } + } + return b.toString(); + } + + public RaftServerImpl getLeader() { + final List<RaftServerImpl> leaders = new ArrayList<>(); + servers.values().stream() + .filter(s -> s.isAlive() && s.isLeader()) + .forEach(s -> { + if (leaders.isEmpty()) { + leaders.add(s); + } else { + final long leaderTerm = leaders.get(0).getState().getCurrentTerm(); + final long term = s.getState().getCurrentTerm(); + if (term >= leaderTerm) { + if (term > leaderTerm) { + leaders.clear(); + } + leaders.add(s); + } + } + }); + if (leaders.isEmpty()) { + return null; + } else if (leaders.size() != 1) { + Assert.fail(printServers() + leaders.toString() + + "leaders.size() = " + leaders.size() + " != 1"); + } + return leaders.get(0); + } + + public boolean isLeader(String leaderId) throws InterruptedException { + final RaftServerImpl leader = getLeader(); + return leader != null && leader.getId().equals(leaderId); + } + + public List<RaftServerImpl> getFollowers() { + return servers.values().stream() + .filter(s -> s.isAlive() && s.isFollower()) + .collect(Collectors.toList()); + } + + public Collection<RaftServerImpl> getServers() { + return servers.values(); + } + + public RaftServerImpl getServer(String id) { + return servers.get(id); + } + + public Collection<RaftPeer> getPeers() { + return getServers().stream().map(s -> + new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress())) + .collect(Collectors.toList()); + } + + public RaftClient createClient(String clientId, String leaderId) { + return new RaftClientImpl(clientId, conf.getPeers(), + getRaftClientRequestSender(), leaderId, properties); + } + + public void shutdown() { + LOG.info("Stopping " + getClass().getSimpleName()); + servers.values().stream().filter(RaftServerImpl::isAlive) + .forEach(RaftServerImpl::close); + + if (ExitUtils.isTerminated()) { + LOG.error("Test resulted in an unexpected exit", + ExitUtils.getFirstExitException()); + throw new AssertionError("Test resulted in an unexpected exit"); + } + } + + /** + * Block all the incoming requests for the peer with leaderId. Also delay + * outgoing or incoming msg for all other peers. + */ + protected abstract void blockQueueAndSetDelay(String leaderId, int delayMs) + throws InterruptedException; + + /** + * Try to enforce the leader of the cluster. + * @param leaderId ID of the targeted leader server. + * @return true if server has been successfully enforced to the leader, false + * otherwise. + */ + public boolean tryEnforceLeader(String leaderId) throws InterruptedException { + // do nothing and see if the given id is already a leader. + if (isLeader(leaderId)) { + return true; + } + + // Blocking all other server's RPC read process to make sure a read takes at + // least ELECTION_TIMEOUT_MIN. In this way when the target leader request a + // vote, all non-leader servers can grant the vote. + // Disable the target leader server RPC so that it can request a vote. + blockQueueAndSetDelay(leaderId, RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); + + // Reopen queues so that the vote can make progress. + blockQueueAndSetDelay(leaderId, 0); + + return isLeader(leaderId); + } + + /** Block/unblock the requests sent from the given source. */ + public abstract void setBlockRequestsFrom(String src, boolean block); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java new file mode 100644 index 0000000..4ec78b9 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.junit.*; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.ratis.RaftTestUtil.waitAndKillLeader; +import static org.apache.ratis.RaftTestUtil.waitForLeader; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class RaftBasicTests { + public static final Logger LOG = LoggerFactory.getLogger(RaftBasicTests.class); + + public static final int NUM_SERVERS = 5; + + protected static final RaftProperties properties = new RaftProperties(); + + public abstract MiniRaftCluster getCluster(); + + public RaftProperties getProperties() { + return properties; + } + + @Rule + public Timeout globalTimeout = new Timeout(120 * 1000); + + @Before + public void setup() throws IOException { + Assert.assertNull(getCluster().getLeader()); + getCluster().start(); + } + + @After + public void tearDown() { + final MiniRaftCluster cluster = getCluster(); + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testBasicLeaderElection() throws Exception { + LOG.info("Running testBasicLeaderElection"); + final MiniRaftCluster cluster = getCluster(); + waitAndKillLeader(cluster, true); + waitAndKillLeader(cluster, true); + waitAndKillLeader(cluster, true); + waitAndKillLeader(cluster, false); + } + + @Test + public void testBasicAppendEntries() throws Exception { + LOG.info("Running testBasicAppendEntries"); + final MiniRaftCluster cluster = getCluster(); + RaftServerImpl leader = waitForLeader(cluster); + final long term = leader.getState().getCurrentTerm(); + final String killed = cluster.getFollowers().get(3).getId(); + cluster.killServer(killed); + LOG.info(cluster.printServers()); + + final SimpleMessage[] messages = SimpleMessage.create(10); + try(final RaftClient client = cluster.createClient("client", null)) { + for (SimpleMessage message : messages) { + client.send(message); + } + } + + Thread.sleep(cluster.getMaxTimeout() + 100); + LOG.info(cluster.printAllLogs()); + + cluster.getServers().stream().filter(RaftServerImpl::isAlive) + .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE)) + .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages)); + } + + @Test + public void testEnforceLeader() throws Exception { + LOG.info("Running testEnforceLeader"); + final String leader = "s" + ThreadLocalRandom.current().nextInt(NUM_SERVERS); + LOG.info("enforce leader to " + leader); + final MiniRaftCluster cluster = getCluster(); + waitForLeader(cluster); + waitForLeader(cluster, leader); + } + + static class Client4TestWithLoad extends Thread { + final RaftClient client; + final SimpleMessage[] messages; + + final AtomicInteger step = new AtomicInteger(); + volatile Exception exceptionInClientThread; + + Client4TestWithLoad(RaftClient client, int numMessages) { + this.client = client; + this.messages = SimpleMessage.create(numMessages, client.getId()); + } + + boolean isRunning() { + return step.get() < messages.length && exceptionInClientThread == null; + } + + @Override + public void run() { + try { + for (; isRunning(); ) { + client.send(messages[step.getAndIncrement()]); + } + client.close(); + } catch (IOException ioe) { + exceptionInClientThread = ioe; + } + } + } + + @Test + public void testWithLoad() throws Exception { + testWithLoad(10, 500); + } + + private void testWithLoad(final int numClients, final int numMessages) + throws Exception { + LOG.info("Running testWithLoad: numClients=" + numClients + + ", numMessages=" + numMessages); + + final MiniRaftCluster cluster = getCluster(); + LOG.info(cluster.printServers()); + + final List<Client4TestWithLoad> clients + = Stream.iterate(0, i -> i+1).limit(numClients) + .map(i -> cluster.createClient(String.valueOf((char)('a' + i)), null)) + .map(c -> new Client4TestWithLoad(c, numMessages)) + .collect(Collectors.toList()); + clients.forEach(Thread::start); + + int count = 0; + for(int lastStep = 0;; ) { + if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) { + break; + } + + final int n = clients.stream().mapToInt(c -> c.step.get()).sum(); + if (n - lastStep < 50 * numClients) { // Change leader at least 50 steps. + Thread.sleep(10); + continue; + } + lastStep = n; + count++; + + RaftServerImpl leader = cluster.getLeader(); + if (leader != null) { + final String oldLeader = leader.getId(); + LOG.info("Block all requests sent by leader " + oldLeader); + String newLeader = RaftTestUtil.changeLeader(cluster, oldLeader); + LOG.info("Changed leader from " + oldLeader + " to " + newLeader); + Assert.assertFalse(newLeader.equals(oldLeader)); + } + } + + for(Client4TestWithLoad c : clients) { + c.join(); + } + for(Client4TestWithLoad c : clients) { + if (c.exceptionInClientThread != null) { + throw new AssertionError(c.exceptionInClientThread); + } + RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages); + } + + LOG.info("Leader change count=" + count + cluster.printAllLogs()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java new file mode 100644 index 0000000..6d25835 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.client.impl.RaftClientImpl; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.simulation.RequestHandler; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.util.RaftUtils; +import org.junit.*; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +public abstract class RaftNotLeaderExceptionBaseTest { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + public static final Logger LOG = + LoggerFactory.getLogger(RaftNotLeaderExceptionBaseTest.class); + public static final int NUM_PEERS = 3; + + @Rule + public Timeout globalTimeout = new Timeout(60 * 1000); + + private MiniRaftCluster cluster; + + public abstract MiniRaftCluster initCluster() throws IOException; + + @Before + public void setup() throws IOException { + this.cluster = initCluster(); + cluster.start(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testHandleNotLeaderException() throws Exception { + RaftTestUtil.waitForLeader(cluster); + final String leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient("client", leaderId); + + RaftClientReply reply = client.send(new SimpleMessage("m1")); + Assert.assertTrue(reply.isSuccess()); + + // enforce leader change + String newLeader = RaftTestUtil.changeLeader(cluster, leaderId); + Assert.assertNotEquals(leaderId, newLeader); + + RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender(); + reply= null; + for (int i = 0; reply == null && i < 10; i++) { + try { + reply = rpc.sendRequest( + new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM, + new SimpleMessage("m2"))); + } catch (IOException ignored) { + Thread.sleep(1000); + } + } + Assert.assertNotNull(reply); + Assert.assertFalse(reply.isSuccess()); + Assert.assertTrue(reply.isNotLeader()); + Assert.assertEquals(newLeader, + reply.getNotLeaderException().getSuggestedLeader().getId()); + + reply = client.send(new SimpleMessage("m3")); + Assert.assertTrue(reply.isSuccess()); + client.close(); + } + + @Test + public void testNotLeaderExceptionWithReconf() throws Exception { + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster)); + + final String leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient("client", leaderId); + + // enforce leader change + String newLeader = RaftTestUtil.changeLeader(cluster, leaderId); + Assert.assertNotEquals(leaderId, newLeader); + + // also add two new peers + // add two more peers + MiniRaftCluster.PeerChanges change = cluster.addNewPeers( + new String[]{"ss1", "ss2"}, true); + // trigger setConfiguration + LOG.info("Start changing the configuration: {}", + Arrays.asList(change.allPeersInNewConf)); + try(final RaftClient c2 = cluster.createClient("client2", newLeader)) { + RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf); + Assert.assertTrue(reply.isSuccess()); + } + LOG.info(cluster.printServers()); + + RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender(); + RaftClientReply reply = null; + // it is possible that the remote peer's rpc server is not ready. need retry + for (int i = 0; reply == null && i < 10; i++) { + try { + reply = rpc.sendRequest( + new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM, + new SimpleMessage("m1"))); + } catch (IOException ignored) { + Thread.sleep(1000); + } + } + Assert.assertNotNull(reply); + Assert.assertFalse(reply.isSuccess()); + Assert.assertTrue(reply.isNotLeader()); + Assert.assertEquals(newLeader, + reply.getNotLeaderException().getSuggestedLeader().getId()); + Collection<RaftPeer> peers = cluster.getPeers(); + RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers(); + Assert.assertEquals(peers.size(), peersFromReply.length); + for (RaftPeer p : peersFromReply) { + Assert.assertTrue(peers.contains(p)); + } + + reply = client.send(new SimpleMessage("m2")); + Assert.assertTrue(reply.isSuccess()); + client.close(); + } +}
