http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java deleted file mode 100644 index 293e1a4..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java +++ /dev/null @@ -1,327 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.commons.io.Charsets; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.impl.ConfigurationManager; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.server.storage.RaftStorageDirectory.LogPathAndIndex; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.AutoCloseableLock; -import org.apache.raft.util.CodeInjectionForTesting; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT; -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY; - -/** - * The RaftLog implementation that writes log entries into segmented files in - * local disk. - * - * The max log segment size is 8MB. The real log segment size may not be - * exactly equal to this limit. If a log entry's size exceeds 8MB, this entry - * will be stored in a single segment. - * - * There are two types of segments: closed segment and open segment. The former - * is named as "log_startindex-endindex", the later is named as - * "log_inprogress_startindex". - * - * There can be multiple closed segments but there is at most one open segment. - * When the open segment reaches the size limit, or the log term increases, we - * close the open segment and start a new open segment. A closed segment cannot - * be appended anymore, but it can be truncated in case that a follower's log is - * inconsistent with the current leader. - * - * Every closed segment should be non-empty, i.e., it should contain at least - * one entry. - * - * There should not be any gap between segments. The first segment may not start - * from index 0 since there may be snapshots as log compaction. The last index - * in segments should be no smaller than the last index of snapshot, otherwise - * we may have hole when append further log. - */ -public class SegmentedRaftLog extends RaftLog { - static final String HEADER_STR = "RAFTLOG1"; - static final byte[] HEADER_BYTES = HEADER_STR.getBytes(Charsets.UTF_8); - - /** - * I/O task definitions. - */ - static abstract class Task { - private boolean done = false; - - synchronized void done() { - done = true; - notifyAll(); - } - - synchronized void waitForDone() throws InterruptedException { - while (!done) { - wait(); - } - } - - abstract void execute() throws IOException; - - abstract long getEndIndex(); - - @Override - public String toString() { - return getClass().getSimpleName() + "-" + getEndIndex(); - } - } - private static final ThreadLocal<Task> myTask = new ThreadLocal<>(); - - private final RaftStorage storage; - private final RaftLogCache cache; - private final RaftLogWorker fileLogWorker; - private final long segmentMaxSize; - - public SegmentedRaftLog(String selfId, RaftServerImpl server, RaftStorage storage, - long lastIndexInSnapshot, RaftProperties properties) throws IOException { - super(selfId); - this.storage = storage; - this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, - RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT); - cache = new RaftLogCache(); - fileLogWorker = new RaftLogWorker(server, storage, properties); - lastCommitted.set(lastIndexInSnapshot); - } - - @Override - public void open(ConfigurationManager confManager, long lastIndexInSnapshot) - throws IOException { - loadLogSegments(confManager, lastIndexInSnapshot); - File openSegmentFile = null; - if (cache.getOpenSegment() != null) { - openSegmentFile = storage.getStorageDir() - .getOpenLogFile(cache.getOpenSegment().getStartIndex()); - } - fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot), - openSegmentFile); - super.open(confManager, lastIndexInSnapshot); - } - - @Override - public long getStartIndex() { - return cache.getStartIndex(); - } - - private void loadLogSegments(ConfigurationManager confManager, - long lastIndexInSnapshot) throws IOException { - try(AutoCloseableLock writeLock = writeLock()) { - List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles(); - for (LogPathAndIndex pi : paths) { - LogSegment logSegment = parseLogSegment(pi, confManager); - cache.addSegment(logSegment); - } - - // if the largest index is smaller than the last index in snapshot, we do - // not load the log to avoid holes between log segments. This may happen - // when the local I/O worker is too slow to persist log (slower than - // committing the log and taking snapshot) - if (!cache.isEmpty() && cache.getEndIndex() < lastIndexInSnapshot) { - LOG.warn("End log index {} is smaller than last index in snapshot {}", - cache.getEndIndex(), lastIndexInSnapshot); - cache.clear(); - // TODO purge all segment files - } - } - } - - private LogSegment parseLogSegment(LogPathAndIndex pi, - ConfigurationManager confManager) throws IOException { - final boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX; - return LogSegment.loadSegment(pi.path.toFile(), pi.startIndex, pi.endIndex, - isOpen, confManager); - } - - @Override - public LogEntryProto get(long index) { - checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getEntry(index); - } - } - - @Override - public LogEntryProto[] getEntries(long startIndex, long endIndex) { - checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getEntries(startIndex, endIndex); - } - } - - @Override - public LogEntryProto getLastEntry() { - checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getLastEntry(); - } - } - - /** - * The method, along with {@link #appendEntry} and - * {@link #append(LogEntryProto...)} need protection of RaftServer's lock. - */ - @Override - void truncate(long index) { - checkLogState(); - try(AutoCloseableLock writeLock = writeLock()) { - RaftLogCache.TruncationSegments ts = cache.truncate(index); - if (ts != null) { - Task task = fileLogWorker.truncate(ts); - myTask.set(task); - } - } - } - - @Override - void appendEntry(LogEntryProto entry) { - checkLogState(); - try(AutoCloseableLock writeLock = writeLock()) { - final LogSegment currentOpenSegment = cache.getOpenSegment(); - if (currentOpenSegment == null) { - cache.addSegment(LogSegment.newOpenSegment(entry.getIndex())); - fileLogWorker.startLogSegment(getNextIndex()); - } else if (isSegmentFull(currentOpenSegment, entry)) { - cache.rollOpenSegment(true); - fileLogWorker.rollLogSegment(currentOpenSegment); - } else if (currentOpenSegment.numOfEntries() > 0 && - currentOpenSegment.getLastRecord().entry.getTerm() != entry.getTerm()) { - // the term changes - final long currentTerm = currentOpenSegment.getLastRecord().entry - .getTerm(); - Preconditions.checkState(currentTerm < entry.getTerm(), - "open segment's term %s is larger than the new entry's term %s", - currentTerm, entry.getTerm()); - cache.rollOpenSegment(true); - fileLogWorker.rollLogSegment(currentOpenSegment); - } - - cache.appendEntry(entry); - myTask.set(fileLogWorker.writeLogEntry(entry)); - } - } - - private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) { - if (segment.getTotalSize() >= segmentMaxSize) { - return true; - } else { - final long entrySize = LogSegment.getEntrySize(entry); - // if entry size is greater than the max segment size, write it directly - // into the current segment - return entrySize <= segmentMaxSize && - segment.getTotalSize() + entrySize > segmentMaxSize; - } - } - - @Override - public void append(LogEntryProto... entries) { - checkLogState(); - if (entries == null || entries.length == 0) { - return; - } - try(AutoCloseableLock writeLock = writeLock()) { - Iterator<LogEntryProto> iter = cache.iterator(entries[0].getIndex()); - int index = 0; - long truncateIndex = -1; - for (; iter.hasNext() && index < entries.length; index++) { - LogEntryProto storedEntry = iter.next(); - Preconditions.checkState( - storedEntry.getIndex() == entries[index].getIndex(), - "The stored entry's index %s is not consistent with" + - " the received entries[%s]'s index %s", storedEntry.getIndex(), - index, entries[index].getIndex()); - - if (storedEntry.getTerm() != entries[index].getTerm()) { - // we should truncate from the storedEntry's index - truncateIndex = storedEntry.getIndex(); - break; - } - } - if (truncateIndex != -1) { - // truncate from truncateIndex - truncate(truncateIndex); - } - // append from entries[index] - for (int i = index; i < entries.length; i++) { - appendEntry(entries[i]); - } - } - } - - @Override - public void logSync() throws InterruptedException { - CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null); - final Task task = myTask.get(); - if (task != null) { - task.waitForDone(); - } - } - - @Override - public long getLatestFlushedIndex() { - return fileLogWorker.getFlushedIndex(); - } - - /** - * {@inheritDoc} - * - * This operation is protected by the RaftServer's lock - */ - @Override - public void writeMetadata(long term, String votedFor) throws IOException { - storage.getMetaFile().set(term, votedFor); - } - - @Override - public Metadata loadMetadata() throws IOException { - return new Metadata(storage.getMetaFile().getVotedFor(), - storage.getMetaFile().getTerm()); - } - - @Override - public void syncWithSnapshot(long lastSnapshotIndex) { - fileLogWorker.syncWithSnapshot(lastSnapshotIndex); - // TODO purge log files and normal/tmp/corrupt snapshot files - // if the last index in snapshot is larger than the index of the last - // log entry, we should delete all the log entries and their cache to avoid - // gaps between log segments. - } - - @Override - public void close() throws IOException { - super.close(); - fileLogWorker.close(); - storage.close(); - } - - @VisibleForTesting - RaftLogCache getRaftLogCache() { - return cache; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java b/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java deleted file mode 100644 index 8ab2833..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/SnapshotManager.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.base.Preconditions; -import org.apache.raft.io.MD5Hash; -import org.apache.raft.shaded.proto.RaftProtos.FileChunkProto; -import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.util.FileUtils; -import org.apache.raft.util.MD5FileUtil; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.channels.FileChannel; - -/** - * Manage snapshots of a raft peer. - * TODO: snapshot should be treated as compaction log thus can be merged into - * RaftLog. In this way we can have a unified getLastTermIndex interface. - */ -public class SnapshotManager { - private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class); - - private final RaftStorage storage; - private final String selfId; - - public SnapshotManager(RaftStorage storage, String selfId) - throws IOException { - this.storage = storage; - this.selfId = selfId; - } - - public void installSnapshot(StateMachine stateMachine, - InstallSnapshotRequestProto request) throws IOException { - final long lastIncludedIndex = request.getTermIndex().getIndex(); - final RaftStorageDirectory dir = storage.getStorageDir(); - - File tmpDir = dir.getNewTempDir(); - tmpDir.mkdirs(); - tmpDir.deleteOnExit(); - - LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir); - - // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order, - // and are not lost when whole request cycle is done. Check requestId and requestIndex here - - for (FileChunkProto chunk : request.getFileChunksList()) { - SnapshotInfo pi = stateMachine.getLatestSnapshot(); - if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) { - throw new IOException("There exists snapshot file " - + pi.getFiles() + " in " + selfId - + " with endIndex >= lastIncludedIndex " + lastIncludedIndex); - } - - String fileName = chunk.getFilename(); // this is relative to the root dir - // TODO: assumes flat layout inside SM dir - File tmpSnapshotFile = new File(tmpDir, - new File(dir.getRoot(), fileName).getName()); - - FileOutputStream out = null; - try { - // if offset is 0, delete any existing temp snapshot file if it has the - // same last index. - if (chunk.getOffset() == 0) { - if (tmpSnapshotFile.exists()) { - FileUtils.fullyDelete(tmpSnapshotFile); - } - // create the temp snapshot file and put padding inside - out = new FileOutputStream(tmpSnapshotFile); - } else { - Preconditions.checkState(tmpSnapshotFile.exists()); - out = new FileOutputStream(tmpSnapshotFile, true); - FileChannel fc = out.getChannel(); - fc.position(chunk.getOffset()); - } - - // write data to the file - out.write(chunk.getData().toByteArray()); - } finally { - RaftUtils.cleanup(null, out); - } - - // rename the temp snapshot file if this is the last chunk. also verify - // the md5 digest and create the md5 meta-file. - if (chunk.getDone()) { - final MD5Hash expectedDigest = - new MD5Hash(chunk.getFileDigest().toByteArray()); - // calculate the checksum of the snapshot file and compare it with the - // file digest in the request - MD5Hash digest = MD5FileUtil.computeMd5ForFile(tmpSnapshotFile); - if (!digest.equals(expectedDigest)) { - LOG.warn("The snapshot md5 digest {} does not match expected {}", - digest, expectedDigest); - // rename the temp snapshot file to .corrupt -// NativeIO.renameTo(tmpSnapshotFile, // TODO: -// dir.getCorruptSnapshotFile(lastIncludedTerm, lastIncludedIndex)); - throw new IOException("MD5 mismatch for snapshot-" + lastIncludedIndex - + " installation"); - } else { - MD5FileUtil.saveMD5File(tmpSnapshotFile, digest); - } - } - } - - if (request.getDone()) { - LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}", - tmpDir, dir.getStateMachineDir()); - dir.getStateMachineDir().delete(); - tmpDir.renameTo(dir.getStateMachineDir()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java b/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java deleted file mode 100644 index ccc52c7..0000000 --- a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.raft.statemachine; - -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.Message; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.server.impl.RaftConfiguration; -import org.apache.raft.server.impl.RaftServerConstants; -import org.apache.raft.server.storage.RaftStorage; -import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto; -import org.apache.raft.util.LifeCycle; - -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; - -/** - * Base implementation for StateMachines. - */ -public class BaseStateMachine implements StateMachine { - - protected RaftProperties properties; - protected RaftStorage storage; - protected RaftConfiguration raftConf; - protected final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); - - @Override - public LifeCycle.State getLifeCycleState() { - return lifeCycle.getCurrentState(); - } - - @Override - public void initialize(String id, RaftProperties properties, RaftStorage storage) - throws IOException { - lifeCycle.setName(getClass().getSimpleName() + ":" + id); - this.properties = properties; - this.storage = storage; - } - - @Override - public void setRaftConfiguration(RaftConfiguration conf) { - this.raftConf = conf; - } - - @Override - public RaftConfiguration getRaftConfiguration() { - return this.raftConf; - } - - @Override - public SnapshotInfo getLatestSnapshot() { - return getStateMachineStorage().getLatestSnapshot(); - } - - @Override - public void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException { - // do nothing - } - - public void pause() { - } - - @Override - public void reinitialize(String id, RaftProperties properties, RaftStorage storage) - throws IOException { - } - - @Override - public TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException { - return trx; - } - - @Override - public CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException { - // return the same message contained in the entry - Message msg = () -> trx.getLogEntry().get().getSmLogEntry().getData(); - return CompletableFuture.completedFuture(msg); - } - - @Override - public long takeSnapshot() throws IOException { - return RaftServerConstants.INVALID_LOG_INDEX; - } - - @Override - public StateMachineStorage getStateMachineStorage() { - return new StateMachineStorage() { - @Override - public void init(RaftStorage raftStorage) throws IOException { - } - - @Override - public SnapshotInfo getLatestSnapshot() { - return null; - } - - @Override - public void format() throws IOException { - } - }; - } - - @Override - public CompletableFuture<RaftClientReply> query( - RaftClientRequest request) { - return null; - } - - @Override - public TransactionContext startTransaction(RaftClientRequest request) - throws IOException { - return new TransactionContext(this, request, - SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .build()); - } - - @Override - public TransactionContext cancelTransaction(TransactionContext trx) throws IOException { - return trx; - } - - @Override - public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException { - return trx; - } - - @Override - public void close() throws IOException { - // do nothing - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java b/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java deleted file mode 100644 index b65fc13..0000000 --- a/raft-server/src/main/java/org/apache/raft/statemachine/FileListSnapshotInfo.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.FileInfo; - -import java.util.Collections; -import java.util.List; - -/** - * Each snapshot has a list of files. - * - * The objects of this class are immutable. - */ -public class FileListSnapshotInfo implements SnapshotInfo { - private final TermIndex termIndex; - private final List<FileInfo> files; - - public FileListSnapshotInfo(List<FileInfo> files, long term, long index) { - this.termIndex = TermIndex.newTermIndex(term, index); - this.files = Collections.unmodifiableList(files); - } - - @Override - public TermIndex getTermIndex() { - return termIndex; - } - - @Override - public long getTerm() { - return termIndex.getTerm(); - } - - @Override - public long getIndex() { - return termIndex.getIndex(); - } - - @Override - public List<FileInfo> getFiles() { - return files; - } - - @Override - public String toString() { - return termIndex + ":" + files; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java b/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java deleted file mode 100644 index a779f98..0000000 --- a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - - -import com.google.common.annotations.VisibleForTesting; -import org.apache.raft.io.MD5Hash; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.FileInfo; -import org.apache.raft.server.storage.RaftStorage; -import org.apache.raft.util.AtomicFileOutputStream; -import org.apache.raft.util.MD5FileUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * A StateMachineStorage that stores the snapshot in a single file. - */ -public class SimpleStateMachineStorage implements StateMachineStorage { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleStateMachineStorage.class); - - static final String SNAPSHOT_FILE_PREFIX = "snapshot"; - static final String CORRUPT_SNAPSHOT_FILE_SUFFIX = ".corrupt"; - /** snapshot.term_index */ - static final Pattern SNAPSHOT_REGEX = - Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)"); - - private RaftStorage raftStorage; - private File smDir = null; - - private volatile SingleFileSnapshotInfo currentSnapshot = null; - - public void init(RaftStorage raftStorage) throws IOException { - this.raftStorage = raftStorage; - this.smDir = raftStorage.getStorageDir().getStateMachineDir(); - loadLatestSnapshot(); - } - - @Override - public void format() throws IOException { - // TODO - } - - @VisibleForTesting - public static TermIndex getTermIndexFromSnapshotFile(File file) { - final String name = file.getName(); - final Matcher m = SNAPSHOT_REGEX.matcher(name); - if (!m.matches()) { - throw new IllegalArgumentException("File \"" + file - + "\" does not match snapshot file name pattern \"" - + SNAPSHOT_REGEX + "\""); - } - final long term = Long.parseLong(m.group(1)); - final long index = Long.parseLong(m.group(2)); - return TermIndex.newTermIndex(term, index); - } - - protected static String getTmpSnapshotFileName(long term, long endIndex) { - return getSnapshotFileName(term, endIndex) + AtomicFileOutputStream.TMP_EXTENSION; - } - - protected static String getCorruptSnapshotFileName(long term, long endIndex) { - return getSnapshotFileName(term, endIndex) + CORRUPT_SNAPSHOT_FILE_SUFFIX; - } - - public File getSnapshotFile(long term, long endIndex) { - return new File(smDir, getSnapshotFileName(term, endIndex)); - } - - protected File getTmpSnapshotFile(long term, long endIndex) { - return new File(smDir, getTmpSnapshotFileName(term, endIndex)); - } - - protected File getCorruptSnapshotFile(long term, long endIndex) { - return new File(smDir, getCorruptSnapshotFileName(term, endIndex)); - } - - public SingleFileSnapshotInfo findLatestSnapshot() throws IOException { - SingleFileSnapshotInfo latest = null; - try (DirectoryStream<Path> stream = - Files.newDirectoryStream(smDir.toPath())) { - for (Path path : stream) { - Matcher matcher = SNAPSHOT_REGEX.matcher(path.getFileName().toString()); - if (matcher.matches()) { - final long endIndex = Long.parseLong(matcher.group(2)); - if (latest == null || endIndex > latest.getIndex()) { - final long term = Long.parseLong(matcher.group(1)); - MD5Hash fileDigest = MD5FileUtil.readStoredMd5ForFile(path.toFile()); - final FileInfo fileInfo = new FileInfo(path, fileDigest); - latest = new SingleFileSnapshotInfo(fileInfo, term, endIndex); - } - } - } - } - return latest; - } - - public void loadLatestSnapshot() throws IOException { - this.currentSnapshot = findLatestSnapshot(); - } - - public static String getSnapshotFileName(long term, long endIndex) { - return SNAPSHOT_FILE_PREFIX + "." + term + "_" + endIndex; - } - - @Override - public SingleFileSnapshotInfo getLatestSnapshot() { - return currentSnapshot; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java b/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java deleted file mode 100644 index 6b01e17..0000000 --- a/raft-server/src/main/java/org/apache/raft/statemachine/SingleFileSnapshotInfo.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import org.apache.raft.server.storage.FileInfo; - -import java.util.Arrays; - -/** - * Each snapshot only has a single file. - * - * The objects of this class are immutable. - */ -public class SingleFileSnapshotInfo extends FileListSnapshotInfo { - public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) { - super(Arrays.asList(fileInfo), term, endIndex); - } - - /** @return the file associated with the snapshot. */ - public FileInfo getFile() { - return getFiles().get(0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java b/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java deleted file mode 100644 index 0fdcbc3..0000000 --- a/raft-server/src/main/java/org/apache/raft/statemachine/SnapshotInfo.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.FileInfo; - -import java.util.List; - -/** - * SnapshotInfo represents a durable state by the state machine. The state machine implementation is - * responsible for the layout of the snapshot files as well as making the data durable. Latest term, - * latest index, and the raft configuration must be saved together with any data files in the - * snapshot. - */ -public interface SnapshotInfo { - - /** - * Returns the term and index corresponding to this snapshot. - * @return The term and index corresponding to this snapshot. - */ - TermIndex getTermIndex(); - - /** - * Returns the term corresponding to this snapshot. - * @return The term corresponding to this snapshot. - */ - long getTerm(); - - /** - * Returns the index corresponding to this snapshot. - * @return The index corresponding to this snapshot. - */ - long getIndex(); - - /** - * Returns a list of files corresponding to this snapshot. This list should include all - * the files that the state machine keeps in its data directory. This list of files will be - * copied as to other replicas in install snapshot RPCs. - * @return a list of Files corresponding to the this snapshot. - */ - List<FileInfo> getFiles(); -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java b/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java deleted file mode 100644 index 3dedf88..0000000 --- a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachine.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.Message; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.impl.RaftConfiguration; -import org.apache.raft.server.storage.RaftStorage; -import org.apache.raft.util.LifeCycle; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; - -/** - * StateMachine is the entry point for the custom implementation of replicated state as defined in - * the "State Machine Approach" in the literature - * (see https://en.wikipedia.org/wiki/State_machine_replication). - */ -public interface StateMachine extends Closeable { - /** - * Initializes the State Machine with the given properties and storage. The state machine is - * responsible reading the latest snapshot from the file system (if any) and initialize itself - * with the latest term and index there including all the edits. - */ - void initialize(String id, RaftProperties properties, RaftStorage storage) - throws IOException; - - /** - * Returns the lifecycle state for this StateMachine. - * @return the lifecycle state. - */ - LifeCycle.State getLifeCycleState(); - - /** - * Pauses the state machine. On return, the state machine should have closed all open files so - * that a new snapshot can be installed. - */ - void pause(); - - /** - * Re-initializes the State Machine in PAUSED state with the given properties and storage. The - * state machine is responsible reading the latest snapshot from the file system (if any) and - * initialize itself with the latest term and index there including all the edits. - */ - void reinitialize(String id, RaftProperties properties, RaftStorage storage) - throws IOException; - - /** - * Dump the in-memory state into a snapshot file in the RaftStorage. The - * StateMachine implementation can decide 1) its own snapshot format, 2) when - * a snapshot is taken, and 3) how the snapshot is taken (e.g., whether the - * snapshot blocks the state machine, and whether to purge log entries after - * a snapshot is done). - * - * In the meanwhile, when the size of raft log outside of the latest snapshot - * exceeds certain threshold, the RaftServer may choose to trigger a snapshot - * if {@link RaftServerConfigKeys#RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY} is - * enabled. - * - * The snapshot should include the latest raft configuration. - * - * @return the largest index of the log entry that has been applied to the - * state machine and also included in the snapshot. Note the log purge - * should be handled separately. - */ - // TODO: refactor this - long takeSnapshot() throws IOException; - - /** - * Record the RaftConfiguration in the state machine. The RaftConfiguration - * should also be stored in the snapshot. - */ - void setRaftConfiguration(RaftConfiguration conf); - - /** - * @return the latest raft configuration recorded in the state machine. - */ - RaftConfiguration getRaftConfiguration(); - - /** - * @return StateMachineStorage to interact with the durability guarantees provided by the - * state machine. - */ - StateMachineStorage getStateMachineStorage(); - - /** - * Returns the information for the latest durable snapshot. - */ - SnapshotInfo getLatestSnapshot(); - - /** - * Query the state machine. The request must be read-only. - * TODO: extend RaftClientRequest to have a read-only request subclass. - */ - CompletableFuture<RaftClientReply> query(RaftClientRequest request); - - /** - * Validate/pre-process the incoming update request in the state machine. - * @return the content to be written to the log entry. Null means the request - * should be rejected. - * @throws IOException thrown by the state machine while validation - */ - TransactionContext startTransaction(RaftClientRequest request) - throws IOException; - - /** - * This is called before the transaction passed from the StateMachine is appended to the raft log. - * This method will be called from log append and having the same strict serial order that the - * transactions will have in the RAFT log. Since this is called serially in the critical path of - * log append, it is important to do only required operations here. - * @return The Transaction context. - */ - TransactionContext preAppendTransaction(TransactionContext trx) throws IOException; - - /** - * Called to notify the state machine that the Transaction passed cannot be appended (or synced). - * The exception field will indicate whether there was an exception or not. - * @param trx the transaction to cancel - * @return cancelled transaction - */ - TransactionContext cancelTransaction(TransactionContext trx) throws IOException; - - /** - * Called for transactions that have been committed to the RAFT log. This step is called - * sequentially in strict serial order that the transactions have been committed in the log. - * The SM is expected to do only necessary work, and leave the actual apply operation to the - * applyTransaction calls that can happen concurrently. - * @param trx the transaction state including the log entry that has been committed to a quorum - * of the raft peers - * @return The Transaction context. - */ - TransactionContext applyTransactionSerial(TransactionContext trx) throws IOException; - - /** - * Apply a committed log entry to the state machine. This method can be called concurrently with - * the other calls, and there is no guarantee that the calls will be ordered according to the - * log commit order. - * @param trx the transaction state including the log entry that has been committed to a quorum - * of the raft peers - */ - // TODO: We do not need to return CompletableFuture - CompletableFuture<Message> applyTransaction(TransactionContext trx) throws IOException; - - /** - * Notify the state machine that the raft peer is no longer leader. - */ - void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java b/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java deleted file mode 100644 index 30005f9..0000000 --- a/raft-server/src/main/java/org/apache/raft/statemachine/StateMachineStorage.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import org.apache.raft.server.storage.RaftStorage; -import java.io.IOException; - -public interface StateMachineStorage { - - void init(RaftStorage raftStorage) throws IOException; - - /** - * Returns the information for the latest durable snapshot. - */ - SnapshotInfo getLatestSnapshot(); - - // TODO: StateMachine can decide to compact the files independently of concurrent install snapshot - // etc requests. We should have ref counting for the SnapshotInfo with a release mechanism - // so that raft server will release the files after the snapshot file copy in case a compaction - // is waiting for deleting these files. - - void format() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java b/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java deleted file mode 100644 index 675ada9..0000000 --- a/raft-server/src/main/java/org/apache/raft/statemachine/TransactionContext.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto; - -import java.io.IOException; -import java.util.Collection; -import java.util.Optional; - -/** - * Context for a transaction. - * The transaction might have originated from a client request, or it - * maybe coming from another replica of the state machine through the RAFT log. - * {@link TransactionContext} can be created from - * either the {@link StateMachine} or the state machine updater. - * - * In the first case, the {@link StateMachine} is a leader. When it receives - * a {@link StateMachine#startTransaction(RaftClientRequest)} request, it returns - * a {@link TransactionContext} with the changes from the {@link StateMachine}. - * The same context will be passed back to the {@link StateMachine} - * via the {@link StateMachine#applyTransaction(TransactionContext)} call - * or the {@link StateMachine#notifyNotLeader(Collection)} call. - * - * In the second case, the {@link StateMachine} is a follower. - * The {@link TransactionContext} will be a committed entry coming from - * the RAFT log from the leader. - */ -public class TransactionContext { - - /** The {@link StateMachine} that originated the transaction. */ - private final StateMachine stateMachine; - - /** Original request from the client */ - private Optional<RaftClientRequest> clientRequest = Optional.empty(); - - /** Exception from the {@link StateMachine} or from the log */ - private Optional<Exception> exception = Optional.empty(); - - /** Data from the {@link StateMachine} */ - private Optional<SMLogEntryProto> smLogEntryProto = Optional.empty(); - - /** - * Context specific to the state machine. - * The {@link StateMachine} can use this object to carry state between - * {@link StateMachine#startTransaction(RaftClientRequest)} and - * {@link StateMachine#applyTransaction(TransactionContext)}. - */ - private Optional<Object> stateMachineContext = Optional.empty(); - - /** - * Whether to commit the transaction to the RAFT Log. - * In some cases the {@link StateMachine} may want to indicate - * that the transaction should not be committed - */ - private boolean shouldCommit = true; - - /** Committed LogEntry. */ - private Optional<LogEntryProto> logEntry = Optional.empty(); - - private TransactionContext(StateMachine stateMachine) { - this.stateMachine = stateMachine; - } - - /** The same as this(stateMachine, clientRequest, smLogEntryProto, null). */ - public TransactionContext( - StateMachine stateMachine, RaftClientRequest clientRequest, - SMLogEntryProto smLogEntryProto) { - this(stateMachine, clientRequest, smLogEntryProto, null); - } - - /** - * Construct a {@link TransactionContext} from a client request. - * Used by the state machine to start a transaction - * and send the Log entry representing the transaction data - * to be applied to the raft log. - */ - public TransactionContext( - StateMachine stateMachine, RaftClientRequest clientRequest, - SMLogEntryProto smLogEntryProto, Object stateMachineContext) { - this(stateMachine); - this.clientRequest = Optional.of(clientRequest); - this.smLogEntryProto = Optional.ofNullable(smLogEntryProto); - this.stateMachineContext = Optional.ofNullable(stateMachineContext); - } - - /** The same as this(stateMachine, clientRequest, exception, null). */ - public TransactionContext( - StateMachine stateMachine, RaftClientRequest clientRequest, - Exception exception) { - this(stateMachine, clientRequest, exception, null); - } - - /** - * Construct a {@link TransactionContext} from a client request to signal - * an exception so that the RAFT server will fail the request on behalf - * of the {@link StateMachine}. - */ - public TransactionContext( - StateMachine stateMachine, RaftClientRequest clientRequest, - Exception exception, Object stateMachineContext) { - this(stateMachine); - this.clientRequest = Optional.of(clientRequest); - this.exception = Optional.of(exception); - this.stateMachineContext = Optional.ofNullable(stateMachineContext); - } - - /** - * Construct a {@link TransactionContext} from a {@link LogEntryProto}. - * Used by followers for applying committed entries to the state machine. - * @param logEntry the log entry to be applied - */ - public TransactionContext(StateMachine stateMachine, LogEntryProto logEntry) { - this(stateMachine); - this.smLogEntryProto = Optional.of(logEntry.getSmLogEntry()); - this.logEntry = Optional.of(logEntry); - } - - public Optional<RaftClientRequest> getClientRequest() { - return this.clientRequest; - } - - public Optional<SMLogEntryProto> getSMLogEntry() { - return this.smLogEntryProto; - } - - public Optional<Exception> getException() { - return this.exception; - } - - public TransactionContext setStateMachineContext(Object stateMachineContext) { - this.stateMachineContext = Optional.ofNullable(stateMachineContext); - return this; - } - - public Optional<Object> getStateMachineContext() { - return stateMachineContext; - } - - public TransactionContext setLogEntry(LogEntryProto logEntry) { - this.logEntry = Optional.of(logEntry); - return this; - } - - public TransactionContext setSmLogEntryProto(SMLogEntryProto smLogEntryProto) { - this.smLogEntryProto = Optional.of(smLogEntryProto); - return this; - } - - public Optional<LogEntryProto> getLogEntry() { - return logEntry; - } - - private TransactionContext setException(IOException ioe) { - assert !this.exception.isPresent(); - this.exception = Optional.of(ioe); - return this; - } - - public TransactionContext setShouldCommit(boolean shouldCommit) { - this.shouldCommit = shouldCommit; - return this; - } - - public boolean shouldCommit() { - // TODO: Hook this up in the server to bypass the RAFT Log and send back a response to client - return this.shouldCommit; - } - - // proxy StateMachine methods. We do not want to expose the SM to the RaftLog - - /** - * This is called before the transaction passed from the StateMachine is appended to the raft log. - * This method will be called from log append and having the same strict serial order that the - * Transactions will have in the RAFT log. Since this is called serially in the critical path of - * log append, it is important to do only required operations here. - * @return The Transaction context. - */ - public TransactionContext preAppendTransaction() throws IOException { - return stateMachine.preAppendTransaction(this); - } - - /** - * Called to notify the state machine that the Transaction passed cannot be appended (or synced). - * The exception field will indicate whether there was an exception or not. - * @return cancelled transaction - */ - public TransactionContext cancelTransaction() throws IOException { - // TODO: This is not called from Raft server / log yet. When an IOException happens, we should - // call this to let the SM know that Transaction cannot be synced - return stateMachine.cancelTransaction(this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java deleted file mode 100644 index c66ef8f..0000000 --- a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java +++ /dev/null @@ -1,444 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft; - -import com.google.common.base.Preconditions; -import org.apache.raft.client.RaftClient; -import org.apache.raft.client.RaftClientRequestSender; -import org.apache.raft.client.impl.RaftClientImpl; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.impl.DelayLocalExecutionInjection; -import org.apache.raft.server.impl.RaftConfiguration; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.server.storage.MemoryRaftLog; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.statemachine.BaseStateMachine; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.util.ExitUtils; -import org.apache.raft.util.FileUtils; -import org.apache.raft.util.RaftUtils; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.*; -import java.util.stream.Collectors; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT; - -public abstract class MiniRaftCluster { - public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); - public static final DelayLocalExecutionInjection logSyncDelay = - new DelayLocalExecutionInjection(RaftLog.LOG_SYNC); - - public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName(); - public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class"; - public static final Class<? extends StateMachine> STATEMACHINE_CLASS_DEFAULT = BaseStateMachine.class; - - public static abstract class Factory<CLUSTER extends MiniRaftCluster> { - public abstract CLUSTER newCluster( - String[] ids, RaftProperties prop, boolean formatted) - throws IOException; - - public CLUSTER newCluster( - int numServer, RaftProperties prop, boolean formatted) - throws IOException { - return newCluster(generateIds(numServer, 0), prop, formatted); - } - } - - public static abstract class RpcBase extends MiniRaftCluster { - public RpcBase(String[] ids, RaftProperties properties, boolean formatted) { - super(ids, properties, formatted); - } - - protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException; - - @Override - protected void setPeerRpc() throws IOException { - for (RaftPeer p : conf.getPeers()) { - setPeerRpc(p); - } - } - - @Override - public void restartServer(String id, boolean format) throws IOException { - super.restartServer(id, format); - setPeerRpc(conf.getPeer(id)).start(); - } - - @Override - public void setBlockRequestsFrom(String src, boolean block) { - RaftTestUtil.setBlockRequestsFrom(src, block); - } - } - - public static class PeerChanges { - public final RaftPeer[] allPeersInNewConf; - public final RaftPeer[] newPeers; - public final RaftPeer[] removedPeers; - - public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) { - this.allPeersInNewConf = all; - this.newPeers = newPeers; - this.removedPeers = removed; - } - } - - public static RaftConfiguration initConfiguration(String[] ids) { - return RaftConfiguration.newBuilder() - .setConf(Arrays.stream(ids).map(RaftPeer::new).collect(Collectors.toList())) - .build(); - } - - private static String getBaseDirectory() { - return System.getProperty("test.build.data", "target/test/data") + "/raft/"; - } - - private static void formatDir(String dirStr) { - final File serverDir = new File(dirStr); - Preconditions.checkState(FileUtils.fullyDelete(serverDir), - "Failed to format directory %s", dirStr); - LOG.info("Formatted directory {}", dirStr); - } - - public static String[] generateIds(int numServers, int base) { - String[] ids = new String[numServers]; - for (int i = 0; i < numServers; i++) { - ids[i] = "s" + (i + base); - } - return ids; - } - - protected RaftConfiguration conf; - protected final RaftProperties properties; - private final String testBaseDir; - protected final Map<String, RaftServerImpl> servers = - Collections.synchronizedMap(new LinkedHashMap<>()); - - public MiniRaftCluster(String[] ids, RaftProperties properties, - boolean formatted) { - this.conf = initConfiguration(ids); - this.properties = new RaftProperties(properties); - this.testBaseDir = getBaseDirectory(); - - conf.getPeers().forEach( - p -> servers.put(p.getId(), newRaftServer(p.getId(), conf, formatted))); - - ExitUtils.disableSystemExit(); - } - - protected <RPC extends RaftServerRpc> void init(Map<RaftPeer, RPC> peers) { - LOG.info("peers = " + peers.keySet()); - conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build(); - for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) { - final RaftServerImpl server = servers.get(entry.getKey().getId()); - server.setInitialConf(conf); - server.setServerRpc(entry.getValue()); - } - } - - public void start() { - LOG.info("Starting " + getClass().getSimpleName()); - servers.values().forEach(RaftServerImpl::start); - } - - /** - * start a stopped server again. - */ - public void restartServer(String id, boolean format) throws IOException { - killServer(id); - servers.remove(id); - servers.put(id, newRaftServer(id, conf, format)); - } - - public final void restart(boolean format) throws IOException { - servers.values().stream().filter(RaftServerImpl::isAlive) - .forEach(RaftServerImpl::close); - List<String> idList = new ArrayList<>(servers.keySet()); - for (String id : idList) { - servers.remove(id); - servers.put(id, newRaftServer(id, conf, format)); - } - - setPeerRpc(); - start(); - } - - protected abstract void setPeerRpc() throws IOException; - - public int getMaxTimeout() { - return properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); - } - - public RaftConfiguration getConf() { - return conf; - } - - private RaftServerImpl newRaftServer(String id, RaftConfiguration conf, - boolean format) { - final RaftServerImpl s; - try { - final String dirStr = testBaseDir + id; - if (format) { - formatDir(dirStr); - } - properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr); - s = new RaftServerImpl(id, conf, properties, getStateMachine4Test(properties)); - } catch (IOException e) { - throw new RuntimeException(e); - } - return s; - } - - static StateMachine getStateMachine4Test(RaftProperties properties) { - final Class<? extends StateMachine> smClass = properties.getClass( - STATEMACHINE_CLASS_KEY, - STATEMACHINE_CLASS_DEFAULT, - StateMachine.class); - return RaftUtils.newInstance(smClass); - } - - public abstract RaftClientRequestSender getRaftClientRequestSender(); - - protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers( - Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers, - boolean startService) throws IOException { - for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) { - RaftServerImpl server = servers.get(entry.getKey().getId()); - server.setServerRpc(entry.getValue()); - } - if (startService) { - newServers.forEach(RaftServerImpl::start); - } - return new ArrayList<>(newPeers.keySet()); - } - - protected abstract Collection<RaftPeer> addNewPeers( - Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers, - boolean startService) throws IOException; - - public PeerChanges addNewPeers(int number, boolean startNewPeer) - throws IOException { - return addNewPeers(generateIds(number, servers.size()), startNewPeer); - } - - public PeerChanges addNewPeers(String[] ids, - boolean startNewPeer) throws IOException { - LOG.info("Add new peers {}", Arrays.asList(ids)); - Collection<RaftPeer> newPeers = new ArrayList<>(ids.length); - for (String id : ids) { - newPeers.add(new RaftPeer(id)); - } - - // create and add new RaftServers - final List<RaftServerImpl> newServers = new ArrayList<>(ids.length); - for (RaftPeer p : newPeers) { - RaftServerImpl newServer = newRaftServer(p.getId(), conf, true); - Preconditions.checkArgument(!servers.containsKey(p.getId())); - servers.put(p.getId(), newServer); - newServers.add(newServer); - } - - // for hadoop-rpc-enabled peer, we assign inetsocketaddress here - newPeers = addNewPeers(newPeers, newServers, startNewPeer); - - final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]); - newPeers.addAll(conf.getPeers()); - conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build(); - RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]); - return new PeerChanges(p, np, new RaftPeer[0]); - } - - public void startServer(String id) { - RaftServerImpl server = servers.get(id); - assert server != null; - server.start(); - } - - private RaftPeer getPeer(RaftServerImpl s) { - return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()); - } - - /** - * prepare the peer list when removing some peers from the conf - */ - public PeerChanges removePeers(int number, boolean removeLeader, - Collection<RaftPeer> excluded) { - Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers()); - List<RaftPeer> removedPeers = new ArrayList<>(number); - if (removeLeader) { - final RaftPeer leader = getPeer(getLeader()); - assert !excluded.contains(leader); - peers.remove(leader); - removedPeers.add(leader); - } - List<RaftServerImpl> followers = getFollowers(); - for (int i = 0, removed = 0; i < followers.size() && - removed < (removeLeader ? number - 1 : number); i++) { - RaftPeer toRemove = getPeer(followers.get(i)); - if (!excluded.contains(toRemove)) { - peers.remove(toRemove); - removedPeers.add(toRemove); - removed++; - } - } - conf = RaftConfiguration.newBuilder().setConf(peers).setLogEntryIndex(0).build(); - RaftPeer[] p = peers.toArray(new RaftPeer[peers.size()]); - return new PeerChanges(p, new RaftPeer[0], - removedPeers.toArray(new RaftPeer[removedPeers.size()])); - } - - public void killServer(String id) { - servers.get(id).close(); - } - - public String printServers() { - StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n"); - for (RaftServerImpl s : servers.values()) { - b.append(" "); - b.append(s).append("\n"); - } - return b.toString(); - } - - public String printAllLogs() { - StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n"); - for (RaftServerImpl s : servers.values()) { - b.append(" "); - b.append(s).append("\n"); - - final RaftLog log = s.getState().getLog(); - if (log instanceof MemoryRaftLog) { - b.append(" "); - b.append(((MemoryRaftLog) log).getEntryString()); - } - } - return b.toString(); - } - - public RaftServerImpl getLeader() { - final List<RaftServerImpl> leaders = new ArrayList<>(); - servers.values().stream() - .filter(s -> s.isAlive() && s.isLeader()) - .forEach(s -> { - if (leaders.isEmpty()) { - leaders.add(s); - } else { - final long leaderTerm = leaders.get(0).getState().getCurrentTerm(); - final long term = s.getState().getCurrentTerm(); - if (term >= leaderTerm) { - if (term > leaderTerm) { - leaders.clear(); - } - leaders.add(s); - } - } - }); - if (leaders.isEmpty()) { - return null; - } else if (leaders.size() != 1) { - Assert.fail(printServers() + leaders.toString() - + "leaders.size() = " + leaders.size() + " != 1"); - } - return leaders.get(0); - } - - public boolean isLeader(String leaderId) throws InterruptedException { - final RaftServerImpl leader = getLeader(); - return leader != null && leader.getId().equals(leaderId); - } - - public List<RaftServerImpl> getFollowers() { - return servers.values().stream() - .filter(s -> s.isAlive() && s.isFollower()) - .collect(Collectors.toList()); - } - - public Collection<RaftServerImpl> getServers() { - return servers.values(); - } - - public RaftServerImpl getServer(String id) { - return servers.get(id); - } - - public Collection<RaftPeer> getPeers() { - return getServers().stream().map(s -> - new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress())) - .collect(Collectors.toList()); - } - - public RaftClient createClient(String clientId, String leaderId) { - return new RaftClientImpl(clientId, conf.getPeers(), - getRaftClientRequestSender(), leaderId, properties); - } - - public void shutdown() { - LOG.info("Stopping " + getClass().getSimpleName()); - servers.values().stream().filter(RaftServerImpl::isAlive) - .forEach(RaftServerImpl::close); - - if (ExitUtils.isTerminated()) { - LOG.error("Test resulted in an unexpected exit", - ExitUtils.getFirstExitException()); - throw new AssertionError("Test resulted in an unexpected exit"); - } - } - - /** - * Block all the incoming requests for the peer with leaderId. Also delay - * outgoing or incoming msg for all other peers. - */ - protected abstract void blockQueueAndSetDelay(String leaderId, int delayMs) - throws InterruptedException; - - /** - * Try to enforce the leader of the cluster. - * @param leaderId ID of the targeted leader server. - * @return true if server has been successfully enforced to the leader, false - * otherwise. - */ - public boolean tryEnforceLeader(String leaderId) throws InterruptedException { - // do nothing and see if the given id is already a leader. - if (isLeader(leaderId)) { - return true; - } - - // Blocking all other server's RPC read process to make sure a read takes at - // least ELECTION_TIMEOUT_MIN. In this way when the target leader request a - // vote, all non-leader servers can grant the vote. - // Disable the target leader server RPC so that it can request a vote. - blockQueueAndSetDelay(leaderId, RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); - - // Reopen queues so that the vote can make progress. - blockQueueAndSetDelay(leaderId, 0); - - return isLeader(leaderId); - } - - /** Block/unblock the requests sent from the given source. */ - public abstract void setBlockRequestsFrom(String src, boolean block); -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java b/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java deleted file mode 100644 index ed40bde..0000000 --- a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft; - -import org.apache.raft.RaftTestUtil.SimpleMessage; -import org.apache.raft.client.RaftClient; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.impl.RaftServerImpl; -import org.junit.*; -import org.junit.rules.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.raft.RaftTestUtil.waitAndKillLeader; -import static org.apache.raft.RaftTestUtil.waitForLeader; - -public abstract class RaftBasicTests { - public static final Logger LOG = LoggerFactory.getLogger(RaftBasicTests.class); - - public static final int NUM_SERVERS = 5; - - protected static final RaftProperties properties = new RaftProperties(); - - public abstract MiniRaftCluster getCluster(); - - public RaftProperties getProperties() { - return properties; - } - - @Rule - public Timeout globalTimeout = new Timeout(120 * 1000); - - @Before - public void setup() throws IOException { - Assert.assertNull(getCluster().getLeader()); - getCluster().start(); - } - - @After - public void tearDown() { - final MiniRaftCluster cluster = getCluster(); - if (cluster != null) { - cluster.shutdown(); - } - } - - @Test - public void testBasicLeaderElection() throws Exception { - LOG.info("Running testBasicLeaderElection"); - final MiniRaftCluster cluster = getCluster(); - waitAndKillLeader(cluster, true); - waitAndKillLeader(cluster, true); - waitAndKillLeader(cluster, true); - waitAndKillLeader(cluster, false); - } - - @Test - public void testBasicAppendEntries() throws Exception { - LOG.info("Running testBasicAppendEntries"); - final MiniRaftCluster cluster = getCluster(); - RaftServerImpl leader = waitForLeader(cluster); - final long term = leader.getState().getCurrentTerm(); - final String killed = cluster.getFollowers().get(3).getId(); - cluster.killServer(killed); - LOG.info(cluster.printServers()); - - final SimpleMessage[] messages = SimpleMessage.create(10); - try(final RaftClient client = cluster.createClient("client", null)) { - for (SimpleMessage message : messages) { - client.send(message); - } - } - - Thread.sleep(cluster.getMaxTimeout() + 100); - LOG.info(cluster.printAllLogs()); - - cluster.getServers().stream().filter(RaftServerImpl::isAlive) - .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE)) - .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages)); - } - - @Test - public void testEnforceLeader() throws Exception { - LOG.info("Running testEnforceLeader"); - final String leader = "s" + ThreadLocalRandom.current().nextInt(NUM_SERVERS); - LOG.info("enforce leader to " + leader); - final MiniRaftCluster cluster = getCluster(); - waitForLeader(cluster); - waitForLeader(cluster, leader); - } - - static class Client4TestWithLoad extends Thread { - final RaftClient client; - final SimpleMessage[] messages; - - final AtomicInteger step = new AtomicInteger(); - volatile Exception exceptionInClientThread; - - Client4TestWithLoad(RaftClient client, int numMessages) { - this.client = client; - this.messages = SimpleMessage.create(numMessages, client.getId()); - } - - boolean isRunning() { - return step.get() < messages.length && exceptionInClientThread == null; - } - - @Override - public void run() { - try { - for (; isRunning(); ) { - client.send(messages[step.getAndIncrement()]); - } - client.close(); - } catch (IOException ioe) { - exceptionInClientThread = ioe; - } - } - } - - @Test - public void testWithLoad() throws Exception { - testWithLoad(10, 500); - } - - private void testWithLoad(final int numClients, final int numMessages) - throws Exception { - LOG.info("Running testWithLoad: numClients=" + numClients - + ", numMessages=" + numMessages); - - final MiniRaftCluster cluster = getCluster(); - LOG.info(cluster.printServers()); - - final List<Client4TestWithLoad> clients - = Stream.iterate(0, i -> i+1).limit(numClients) - .map(i -> cluster.createClient(String.valueOf((char)('a' + i)), null)) - .map(c -> new Client4TestWithLoad(c, numMessages)) - .collect(Collectors.toList()); - clients.forEach(Thread::start); - - int count = 0; - for(int lastStep = 0;; ) { - if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) { - break; - } - - final int n = clients.stream().mapToInt(c -> c.step.get()).sum(); - if (n - lastStep < 50 * numClients) { // Change leader at least 50 steps. - Thread.sleep(10); - continue; - } - lastStep = n; - count++; - - RaftServerImpl leader = cluster.getLeader(); - if (leader != null) { - final String oldLeader = leader.getId(); - LOG.info("Block all requests sent by leader " + oldLeader); - String newLeader = RaftTestUtil.changeLeader(cluster, oldLeader); - LOG.info("Changed leader from " + oldLeader + " to " + newLeader); - Assert.assertFalse(newLeader.equals(oldLeader)); - } - } - - for(Client4TestWithLoad c : clients) { - c.join(); - } - for(Client4TestWithLoad c : clients) { - if (c.exceptionInClientThread != null) { - throw new AssertionError(c.exceptionInClientThread); - } - RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages); - } - - LOG.info("Leader change count=" + count + cluster.printAllLogs()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java b/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java deleted file mode 100644 index 195cbec..0000000 --- a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft; - -import org.apache.log4j.Level; -import org.apache.raft.RaftTestUtil.SimpleMessage; -import org.apache.raft.client.RaftClient; -import org.apache.raft.client.RaftClientRequestSender; -import org.apache.raft.client.impl.RaftClientImpl; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.simulation.RequestHandler; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.util.RaftUtils; -import org.junit.*; -import org.junit.rules.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM; - -public abstract class RaftNotLeaderExceptionBaseTest { - static { - RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - public static final Logger LOG = - LoggerFactory.getLogger(RaftNotLeaderExceptionBaseTest.class); - public static final int NUM_PEERS = 3; - - @Rule - public Timeout globalTimeout = new Timeout(60 * 1000); - - private MiniRaftCluster cluster; - - public abstract MiniRaftCluster initCluster() throws IOException; - - @Before - public void setup() throws IOException { - this.cluster = initCluster(); - cluster.start(); - } - - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - @Test - public void testHandleNotLeaderException() throws Exception { - RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); - - RaftClientReply reply = client.send(new SimpleMessage("m1")); - Assert.assertTrue(reply.isSuccess()); - - // enforce leader change - String newLeader = RaftTestUtil.changeLeader(cluster, leaderId); - Assert.assertNotEquals(leaderId, newLeader); - - RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender(); - reply= null; - for (int i = 0; reply == null && i < 10; i++) { - try { - reply = rpc.sendRequest( - new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM, - new SimpleMessage("m2"))); - } catch (IOException ignored) { - Thread.sleep(1000); - } - } - Assert.assertNotNull(reply); - Assert.assertFalse(reply.isSuccess()); - Assert.assertTrue(reply.isNotLeader()); - Assert.assertEquals(newLeader, - reply.getNotLeaderException().getSuggestedLeader().getId()); - - reply = client.send(new SimpleMessage("m3")); - Assert.assertTrue(reply.isSuccess()); - client.close(); - } - - @Test - public void testNotLeaderExceptionWithReconf() throws Exception { - Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster)); - - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); - - // enforce leader change - String newLeader = RaftTestUtil.changeLeader(cluster, leaderId); - Assert.assertNotEquals(leaderId, newLeader); - - // also add two new peers - // add two more peers - MiniRaftCluster.PeerChanges change = cluster.addNewPeers( - new String[]{"ss1", "ss2"}, true); - // trigger setConfiguration - LOG.info("Start changing the configuration: {}", - Arrays.asList(change.allPeersInNewConf)); - try(final RaftClient c2 = cluster.createClient("client2", newLeader)) { - RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf); - Assert.assertTrue(reply.isSuccess()); - } - LOG.info(cluster.printServers()); - - RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender(); - RaftClientReply reply = null; - // it is possible that the remote peer's rpc server is not ready. need retry - for (int i = 0; reply == null && i < 10; i++) { - try { - reply = rpc.sendRequest( - new RaftClientRequest("client", leaderId, DEFAULT_SEQNUM, - new SimpleMessage("m1"))); - } catch (IOException ignored) { - Thread.sleep(1000); - } - } - Assert.assertNotNull(reply); - Assert.assertFalse(reply.isSuccess()); - Assert.assertTrue(reply.isNotLeader()); - Assert.assertEquals(newLeader, - reply.getNotLeaderException().getSuggestedLeader().getId()); - Collection<RaftPeer> peers = cluster.getPeers(); - RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers(); - Assert.assertEquals(peers.size(), peersFromReply.length); - for (RaftPeer p : peersFromReply) { - Assert.assertTrue(peers.contains(p)); - } - - reply = client.send(new SimpleMessage("m2")); - Assert.assertTrue(reply.isSuccess()); - client.close(); - } -}
