http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java deleted file mode 100644 index fbbcb85..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java +++ /dev/null @@ -1,582 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.*; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.shaded.proto.RaftProtos.LeaderNoOp; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.ProtoUtils; -import org.apache.raft.util.Timestamp; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.apache.raft.server.RaftServerConfigKeys.*; -import static org.apache.raft.server.impl.LeaderState.StateUpdateEventType.*; - -/** - * States for leader only. It contains three different types of processors: - * 1. RPC senders: each thread is appending log to a follower - * 2. EventProcessor: a single thread updating the raft server's state based on - * status of log appending response - * 3. PendingRequestHandler: a handler sending back responses to clients when - * corresponding log entries are committed - */ -public class LeaderState { - private static final Logger LOG = RaftServerImpl.LOG; - - enum StateUpdateEventType { - STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS - } - - enum BootStrapProgress { - NOPROGRESS, PROGRESSING, CAUGHTUP - } - - static class StateUpdateEvent { - final StateUpdateEventType type; - final long newTerm; - - StateUpdateEvent(StateUpdateEventType type, long newTerm) { - this.type = type; - this.newTerm = newTerm; - } - } - - static final StateUpdateEvent UPDATE_COMMIT_EVENT = - new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1); - static final StateUpdateEvent STAGING_PROGRESS_EVENT = - new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1); - - private final RaftServerImpl server; - private final RaftLog raftLog; - private final long currentTerm; - private volatile ConfigurationStagingState stagingState; - private List<List<FollowerInfo>> voterLists; - - /** - * The list of threads appending entries to followers. - * The list is protected by the RaftServer's lock. - */ - private final List<LogAppender> senders; - private final BlockingQueue<StateUpdateEvent> eventQ; - private final EventProcessor processor; - private final PendingRequests pendingRequests; - private volatile boolean running = true; - - private final int stagingCatchupGap; - private final int snapshotChunkMaxSize; - private final int syncInterval; - - LeaderState(RaftServerImpl server, RaftProperties properties) { - this.server = server; - - stagingCatchupGap = properties.getInt( - RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, - RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT); - snapshotChunkMaxSize = properties.getInt( - RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY, - RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT); - syncInterval = properties.getInt( - RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY, - RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT); - - final ServerState state = server.getState(); - this.raftLog = state.getLog(); - this.currentTerm = state.getCurrentTerm(); - eventQ = new ArrayBlockingQueue<>(4096); - processor = new EventProcessor(); - pendingRequests = new PendingRequests(server); - - final RaftConfiguration conf = server.getRaftConf(); - Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId()); - final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); - final long nextIndex = raftLog.getNextIndex(); - senders = new ArrayList<>(others.size()); - for (RaftPeer p : others) { - FollowerInfo f = new FollowerInfo(p, t, nextIndex, true); - senders.add(server.getLogAppenderFactory().getLogAppender(server, this, f)); - } - voterLists = divideFollowers(conf); - } - - void start() { - // In the beginning of the new term, replicate an empty entry in order - // to finally commit entries in the previous term. - // Also this message can help identify the last committed index when - // the leader peer is just started. - final LogEntryProto placeHolder = LogEntryProto.newBuilder() - .setTerm(server.getState().getCurrentTerm()) - .setIndex(raftLog.getNextIndex()) - .setNoOp(LeaderNoOp.newBuilder()).build(); - raftLog.append(placeHolder); - - processor.start(); - startSenders(); - } - - private void startSenders() { - senders.forEach(Thread::start); - } - - void stop() { - this.running = false; - // do not interrupt event processor since it may be in the middle of logSync - for (LogAppender sender : senders) { - sender.stopSender(); - sender.interrupt(); - } - try { - pendingRequests.sendNotLeaderResponses(); - } catch (IOException e) { - LOG.warn("Caught exception in sendNotLeaderResponses", e); - } - } - - void notifySenders() { - senders.forEach(LogAppender::notifyAppend); - } - - boolean inStagingState() { - return stagingState != null; - } - - ConfigurationStagingState getStagingState() { - return stagingState; - } - - long getCurrentTerm() { - return currentTerm; - } - - int getSnapshotChunkMaxSize() { - return snapshotChunkMaxSize; - } - - int getSyncInterval() { - return syncInterval; - } - - /** - * Start bootstrapping new peers - */ - PendingRequest startSetConfiguration(SetConfigurationRequest request) { - Preconditions.checkState(running && !inStagingState()); - - RaftPeer[] peersInNewConf = request.getPeersInNewConf(); - Collection<RaftPeer> peersToBootStrap = RaftConfiguration - .computeNewPeers(peersInNewConf, server.getRaftConf()); - - // add the request to the pending queue - final PendingRequest pending = pendingRequests.addConfRequest(request); - - ConfigurationStagingState stagingState = new ConfigurationStagingState( - peersToBootStrap, new PeerConfiguration(Arrays.asList(peersInNewConf))); - Collection<RaftPeer> newPeers = stagingState.getNewPeers(); - // set the staging state - this.stagingState = stagingState; - - if (newPeers.isEmpty()) { - applyOldNewConf(); - } else { - // update the LeaderState's sender list - addSenders(newPeers); - } - return pending; - } - - PendingRequest addPendingRequest(long index, RaftClientRequest request, - TransactionContext entry) { - return pendingRequests.addPendingRequest(index, request, entry); - } - - private void applyOldNewConf() { - final ServerState state = server.getState(); - final RaftConfiguration current = server.getRaftConf(); - final RaftConfiguration oldNewConf= stagingState.generateOldNewConf(current, - state.getLog().getNextIndex()); - // apply the (old, new) configuration to log, and use it as the current conf - long index = state.getLog().append(state.getCurrentTerm(), oldNewConf); - updateConfiguration(index, oldNewConf); - - this.stagingState = null; - notifySenders(); - } - - private void updateConfiguration(long logIndex, RaftConfiguration newConf) { - voterLists = divideFollowers(newConf); - server.getState().setRaftConf(logIndex, newConf); - } - - /** - * After receiving a setConfiguration request, the leader should update its - * RpcSender list. - */ - void addSenders(Collection<RaftPeer> newMembers) { - final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); - final long nextIndex = raftLog.getNextIndex(); - for (RaftPeer peer : newMembers) { - FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false); - LogAppender sender = server.getLogAppenderFactory() - .getLogAppender(server, this, f); - senders.add(sender); - sender.start(); - } - } - - /** - * Update the RpcSender list based on the current configuration - */ - private void updateSenders(RaftConfiguration conf) { - Preconditions.checkState(conf.isStable() && !inStagingState()); - Iterator<LogAppender> iterator = senders.iterator(); - while (iterator.hasNext()) { - LogAppender sender = iterator.next(); - if (!conf.containsInConf(sender.getFollower().getPeer().getId())) { - iterator.remove(); - sender.stopSender(); - sender.interrupt(); - } - } - } - - void submitUpdateStateEvent(StateUpdateEvent event) { - try { - eventQ.put(event); - } catch (InterruptedException e) { - LOG.info("Interrupted when adding event {} into the queue", event); - } - } - - private void prepare() { - synchronized (server) { - if (running) { - final RaftConfiguration conf = server.getRaftConf(); - if (conf.isTransitional() && server.getState().isConfCommitted()) { - // the configuration is in transitional state, and has been committed - // so it is time to generate and replicate (new) conf. - replicateNewConf(); - } - } - } - } - - /** - * The processor thread takes the responsibility to update the raft server's - * state, such as changing to follower, or updating the committed index. - */ - private class EventProcessor extends Daemon { - @Override - public void run() { - // apply an empty message; check if necessary to replicate (new) conf - prepare(); - - while (running) { - try { - StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(), - TimeUnit.MILLISECONDS); - synchronized (server) { - if (running) { - handleEvent(event); - } - } - // the updated configuration does not need to be sync'ed here - } catch (InterruptedException e) { - final String s = server.getId() + " " + getClass().getSimpleName() - + " thread is interrupted "; - if (!running) { - LOG.info(s + " gracefully; server=" + server); - } else { - LOG.warn(s + " UNEXPECTEDLY; server=" + server, e); - throw new RuntimeException(e); - } - } catch (IOException e) { - LOG.warn("Failed to persist new votedFor/term.", e); - // the failure should happen while changing the state to follower - // thus the in-memory state should have been updated - Preconditions.checkState(!running); - } - } - } - } - - private void handleEvent(StateUpdateEvent e) throws IOException { - if (e == null) { - if (inStagingState()) { - checkNewPeers(); - } - } else { - if (e.type == STEPDOWN) { - server.changeToFollower(e.newTerm, true); - } else if (e.type == UPDATECOMMIT) { - updateLastCommitted(); - } else if (e.type == STAGINGPROGRESS) { - checkNewPeers(); - } - } - } - - /** - * So far we use a simple implementation for catchup checking: - * 1. If the latest rpc time of the remote peer is before 3 * max_timeout, - * the peer made no progress for that long. We should fail the whole - * setConfiguration request. - * 2. If the peer's matching index is just behind for a small gap, and the - * peer was updated recently (within max_timeout), declare the peer as - * caught-up. - * 3. Otherwise the peer is making progressing. Keep waiting. - */ - private BootStrapProgress checkProgress(FollowerInfo follower, - long committed) { - Preconditions.checkArgument(!follower.isAttendingVote()); - final Timestamp progressTime = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); - final Timestamp timeoutTime = new Timestamp().addTimeMs(-3*server.getMaxTimeoutMs()); - if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) { - LOG.debug("{} detects a follower {} timeout for bootstrapping," + - " timeoutTime: {}", server.getId(), follower, timeoutTime); - return BootStrapProgress.NOPROGRESS; - } else if (follower.getMatchIndex() + stagingCatchupGap > committed - && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) { - return BootStrapProgress.CAUGHTUP; - } else { - return BootStrapProgress.PROGRESSING; - } - } - - private Collection<BootStrapProgress> checkAllProgress(long committed) { - Preconditions.checkState(inStagingState()); - return senders.stream() - .filter(sender -> !sender.getFollower().isAttendingVote()) - .map(sender -> checkProgress(sender.getFollower(), committed)) - .collect(Collectors.toCollection(ArrayList::new)); - } - - private void checkNewPeers() { - if (!inStagingState()) { - // it is possible that the bootstrapping is done and we still have - // remaining STAGINGPROGRESS event to handle. - updateLastCommitted(); - } else { - final long committedIndex = server.getState().getLog() - .getLastCommittedIndex(); - Collection<BootStrapProgress> reports = checkAllProgress(committedIndex); - if (reports.contains(BootStrapProgress.NOPROGRESS)) { - LOG.debug("{} fails the setConfiguration request", server.getId()); - stagingState.fail(); - } else if (!reports.contains(BootStrapProgress.PROGRESSING)) { - // all caught up! - applyOldNewConf(); - for (LogAppender sender : senders) { - sender.getFollower().startAttendVote(); - } - } - } - } - - boolean isBootStrappingPeer(String peerId) { - return inStagingState() && getStagingState().contains(peerId); - } - - private void updateLastCommitted() { - final String selfId = server.getId(); - final RaftConfiguration conf = server.getRaftConf(); - long majorityInNewConf = computeLastCommitted(voterLists.get(0), - conf.containsInConf(selfId)); - final long oldLastCommitted = raftLog.getLastCommittedIndex(); - final LogEntryProto[] entriesToCommit; - if (!conf.isTransitional()) { - // copy the entries that may get committed out of the raftlog, to prevent - // the possible race that the log gets purged after the statemachine does - // a snapshot - entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, - Math.max(majorityInNewConf, oldLastCommitted) + 1); - server.getState().updateStatemachine(majorityInNewConf, currentTerm); - } else { // configuration is in transitional state - long majorityInOldConf = computeLastCommitted(voterLists.get(1), - conf.containsInOldConf(selfId)); - final long majority = Math.min(majorityInNewConf, majorityInOldConf); - entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, - Math.max(majority, oldLastCommitted) + 1); - server.getState().updateStatemachine(majority, currentTerm); - } - checkAndUpdateConfiguration(entriesToCommit); - } - - private boolean committedConf(LogEntryProto[] entries) { - final long currentCommitted = raftLog.getLastCommittedIndex(); - for (LogEntryProto entry : entries) { - if (entry.getIndex() <= currentCommitted && - ProtoUtils.isConfigurationLogEntry(entry)) { - return true; - } - } - return false; - } - - private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) { - final RaftConfiguration conf = server.getRaftConf(); - if (committedConf(entriesToCheck)) { - if (conf.isTransitional()) { - replicateNewConf(); - } else { // the (new) log entry has been committed - LOG.debug("{} sends success to setConfiguration request", server.getId()); - pendingRequests.replySetConfiguration(); - // if the leader is not included in the current configuration, step down - if (!conf.containsInConf(server.getId())) { - LOG.info("{} is not included in the new configuration {}. Step down.", - server.getId(), conf); - try { - // leave some time for all RPC senders to send out new conf entry - Thread.sleep(server.getMinTimeoutMs()); - } catch (InterruptedException ignored) { - } - // the pending request handler will send NotLeaderException for - // pending client requests when it stops - server.close(); - } - } - } - } - - /** - * when the (old, new) log entry has been committed, should replicate (new): - * 1) append (new) to log - * 2) update conf to (new) - * 3) update RpcSenders list - * 4) start replicating the log entry - */ - private void replicateNewConf() { - final RaftConfiguration conf = server.getRaftConf(); - final RaftConfiguration newConf = RaftConfiguration.newBuilder() - .setConf(conf) - .setLogEntryIndex(raftLog.getNextIndex()) - .build(); - // stop the LogAppender if the corresponding follower is no longer in the conf - updateSenders(newConf); - long index = raftLog.append(server.getState().getCurrentTerm(), newConf); - updateConfiguration(index, newConf); - notifySenders(); - } - - private long computeLastCommitted(List<FollowerInfo> followers, - boolean includeSelf) { - final int length = includeSelf ? followers.size() + 1 : followers.size(); - final long[] indices = new long[length]; - for (int i = 0; i < followers.size(); i++) { - indices[i] = followers.get(i).getMatchIndex(); - } - if (includeSelf) { - // note that we also need to wait for the local disk I/O - indices[length - 1] = raftLog.getLatestFlushedIndex(); - } - - Arrays.sort(indices); - return indices[(indices.length - 1) / 2]; - } - - private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) { - List<List<FollowerInfo>> lists = new ArrayList<>(2); - List<FollowerInfo> listForNew = senders.stream() - .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId())) - .map(LogAppender::getFollower) - .collect(Collectors.toList()); - lists.add(listForNew); - if (conf.isTransitional()) { - List<FollowerInfo> listForOld = senders.stream() - .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId())) - .map(LogAppender::getFollower) - .collect(Collectors.toList()); - lists.add(listForOld); - } - return lists; - } - - PendingRequest returnNoConfChange(SetConfigurationRequest r) { - PendingRequest pending = new PendingRequest(r); - pending.setSuccessReply(null); - return pending; - } - - void replyPendingRequest(long logIndex, CompletableFuture<Message> message) { - pendingRequests.replyPendingRequest(logIndex, message); - } - - TransactionContext getTransactionContext(long index) { - return pendingRequests.getTransactionContext(index); - } - - private class ConfigurationStagingState { - private final Map<String, RaftPeer> newPeers; - private final PeerConfiguration newConf; - - ConfigurationStagingState(Collection<RaftPeer> newPeers, - PeerConfiguration newConf) { - Map<String, RaftPeer> map = new HashMap<>(); - for (RaftPeer peer : newPeers) { - map.put(peer.getId(), peer); - } - this.newPeers = Collections.unmodifiableMap(map); - this.newConf = newConf; - } - - RaftConfiguration generateOldNewConf(RaftConfiguration current, - long logIndex) { - return RaftConfiguration.newBuilder() - .setConf(newConf) - .setOldConf(current) - .setLogEntryIndex(logIndex) - .build(); - } - - Collection<RaftPeer> getNewPeers() { - return newPeers.values(); - } - - boolean contains(String peerId) { - return newPeers.containsKey(peerId); - } - - void fail() { - Iterator<LogAppender> iterator = senders.iterator(); - while (iterator.hasNext()) { - LogAppender sender = iterator.next(); - if (!sender.getFollower().isAttendingVote()) { - iterator.remove(); - sender.stopSender(); - sender.interrupt(); - } - } - LeaderState.this.stagingState = null; - // send back failure response to client's request - pendingRequests.failSetConfiguration( - new ReconfigurationTimeoutException("Fail to set configuration " - + newConf + ". Timeout when bootstrapping new peers.")); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java deleted file mode 100644 index cf613ca..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java +++ /dev/null @@ -1,480 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.io.IOUtils; -import org.apache.raft.server.impl.LeaderState.StateUpdateEventType; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.FileInfo; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.shaded.com.google.protobuf.ByteString; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.ProtoUtils; -import org.apache.raft.util.Timestamp; -import org.slf4j.Logger; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.file.Path; -import java.util.*; - -import static org.apache.raft.server.RaftServerConfigKeys.*; -import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; - -/** - * A daemon thread appending log entries to a follower peer. - */ -public class LogAppender extends Daemon { - public static final Logger LOG = RaftServerImpl.LOG; - - protected final RaftServerImpl server; - private final LeaderState leaderState; - protected final RaftLog raftLog; - protected final FollowerInfo follower; - private final int maxBufferSize; - private final boolean batchSending; - private final LogEntryBuffer buffer; - private final long leaderTerm; - - private volatile boolean sending = true; - - public LogAppender(RaftServerImpl server, LeaderState leaderState, FollowerInfo f) { - this.follower = f; - this.server = server; - this.leaderState = leaderState; - this.raftLog = server.getState().getLog(); - this.maxBufferSize = server.getProperties().getInt( - RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY, - RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT); - this.batchSending = server.getProperties().getBoolean( - RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY, - RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT); - this.buffer = new LogEntryBuffer(); - this.leaderTerm = server.getState().getCurrentTerm(); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "(" + server.getId() + " -> " + - follower.getPeer().getId() + ")"; - } - - @Override - public void run() { - try { - checkAndSendAppendEntries(); - } catch (InterruptedException | InterruptedIOException e) { - LOG.info(this + " was interrupted: " + e); - } - } - - protected boolean isAppenderRunning() { - return sending; - } - - public void stopSender() { - this.sending = false; - } - - public FollowerInfo getFollower() { - return follower; - } - - /** - * A buffer for log entries with size limitation. - */ - private class LogEntryBuffer { - private final List<LogEntryProto> buf = new ArrayList<>(); - private int totalSize = 0; - - void addEntry(LogEntryProto entry) { - buf.add(entry); - totalSize += entry.getSerializedSize(); - } - - boolean isFull() { - return totalSize >= maxBufferSize; - } - - boolean isEmpty() { - return buf.isEmpty(); - } - - AppendEntriesRequestProto getAppendRequest(TermIndex previous) { - final AppendEntriesRequestProto request = server - .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(), - previous, buf, !follower.isAttendingVote()); - buf.clear(); - totalSize = 0; - return request; - } - - int getPendingEntryNum() { - return buf.size(); - } - } - - private TermIndex getPrevious() { - TermIndex previous = ServerProtoUtils.toTermIndex( - raftLog.get(follower.getNextIndex() - 1)); - if (previous == null) { - // if previous is null, nextIndex must be equal to the log start - // index (otherwise we will install snapshot). - Preconditions.checkState(follower.getNextIndex() == raftLog.getStartIndex(), - "follower's next index %s, local log start index %s", - follower.getNextIndex(), raftLog.getStartIndex()); - SnapshotInfo snapshot = server.getState().getLatestSnapshot(); - previous = snapshot == null ? null : snapshot.getTermIndex(); - } - return previous; - } - - protected AppendEntriesRequestProto createRequest() { - final TermIndex previous = getPrevious(); - final long leaderNext = raftLog.getNextIndex(); - long next = follower.getNextIndex() + buffer.getPendingEntryNum(); - boolean toSend = false; - - if (leaderNext == next && !buffer.isEmpty()) { - // no new entries, then send out the entries in the buffer - toSend = true; - } else if (leaderNext > next) { - while (leaderNext > next && !buffer.isFull()) { - // stop adding entry once the buffer size is >= the max size - buffer.addEntry(raftLog.get(next++)); - } - if (buffer.isFull() || !batchSending) { - // buffer is full or batch sending is disabled, send out a request - toSend = true; - } - } - - if (toSend || shouldHeartbeat()) { - return buffer.getAppendRequest(previous); - } - return null; - } - - /** Send an appendEntries RPC; retry indefinitely. */ - private AppendEntriesReplyProto sendAppendEntriesWithRetries() - throws InterruptedException, InterruptedIOException { - int retry = 0; - AppendEntriesRequestProto request = null; - while (isAppenderRunning()) { // keep retrying for IOException - try { - if (request == null || request.getEntriesCount() == 0) { - request = createRequest(); - } - - if (request == null) { - LOG.trace("{} need not send AppendEntries now." + - " Wait for more entries.", server.getId()); - return null; - } else if (!isAppenderRunning()) { - LOG.debug("LogAppender {} has been stopped. Skip the request.", this); - return null; - } - - follower.updateLastRpcSendTime(); - final AppendEntriesReplyProto r = server.getServerRpc() - .appendEntries(request); - follower.updateLastRpcResponseTime(); - - return r; - } catch (InterruptedIOException iioe) { - throw iioe; - } catch (IOException ioe) { - LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe); - } - if (isAppenderRunning()) { - Thread.sleep(leaderState.getSyncInterval()); - } - } - return null; - } - - protected class SnapshotRequestIter - implements Iterable<InstallSnapshotRequestProto> { - private final SnapshotInfo snapshot; - private final List<FileInfo> files; - private FileInputStream in; - private int fileIndex = 0; - - private FileInfo currentFileInfo; - private byte[] currentBuf; - private long currentFileSize; - private long currentOffset = 0; - private int chunkIndex = 0; - - private final String requestId; - private int requestIndex = 0; - - public SnapshotRequestIter(SnapshotInfo snapshot, String requestId) - throws IOException { - this.snapshot = snapshot; - this.requestId = requestId; - this.files = snapshot.getFiles(); - if (files.size() > 0) { - startReadFile(); - } - } - - private void startReadFile() throws IOException { - currentFileInfo = files.get(fileIndex); - File snapshotFile = currentFileInfo.getPath().toFile(); - currentFileSize = snapshotFile.length(); - final int bufLength = - (int) Math.min(leaderState.getSnapshotChunkMaxSize(), currentFileSize); - currentBuf = new byte[bufLength]; - currentOffset = 0; - chunkIndex = 0; - in = new FileInputStream(snapshotFile); - } - - @Override - public Iterator<InstallSnapshotRequestProto> iterator() { - return new Iterator<InstallSnapshotRequestProto>() { - @Override - public boolean hasNext() { - return fileIndex < files.size(); - } - - @Override - public InstallSnapshotRequestProto next() { - if (fileIndex >= files.size()) { - throw new NoSuchElementException(); - } - int targetLength = (int) Math.min(currentFileSize - currentOffset, - leaderState.getSnapshotChunkMaxSize()); - FileChunkProto chunk; - try { - chunk = readFileChunk(currentFileInfo, in, currentBuf, - targetLength, currentOffset, chunkIndex); - boolean done = (fileIndex == files.size() - 1) && - chunk.getDone(); - InstallSnapshotRequestProto request = - server.createInstallSnapshotRequest(follower.getPeer().getId(), - requestId, requestIndex++, snapshot, - Lists.newArrayList(chunk), done); - currentOffset += targetLength; - chunkIndex++; - - if (currentOffset >= currentFileSize) { - in.close(); - fileIndex++; - if (fileIndex < files.size()) { - startReadFile(); - } - } - - return request; - } catch (IOException e) { - if (in != null) { - try { - in.close(); - } catch (IOException ignored) { - } - } - LOG.warn("Got exception when preparing InstallSnapshot request", e); - throw new RuntimeException(e); - } - } - }; - } - } - - private FileChunkProto readFileChunk(FileInfo fileInfo, - FileInputStream in, byte[] buf, int length, long offset, int chunkIndex) - throws IOException { - FileChunkProto.Builder builder = FileChunkProto.newBuilder() - .setOffset(offset).setChunkIndex(chunkIndex); - IOUtils.readFully(in, buf, 0, length); - Path relativePath = server.getState().getStorage().getStorageDir() - .relativizeToRoot(fileInfo.getPath()); - builder.setFilename(relativePath.toString()); - builder.setDone(offset + length == fileInfo.getFileSize()); - builder.setFileDigest( - ByteString.copyFrom(fileInfo.getFileDigest().getDigest())); - builder.setData(ByteString.copyFrom(buf, 0, length)); - return builder.build(); - } - - private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) - throws InterruptedException, InterruptedIOException { - String requestId = UUID.randomUUID().toString(); - InstallSnapshotReplyProto reply = null; - try { - for (InstallSnapshotRequestProto request : - new SnapshotRequestIter(snapshot, requestId)) { - follower.updateLastRpcSendTime(); - reply = server.getServerRpc().installSnapshot(request); - follower.updateLastRpcResponseTime(); - - if (!reply.getServerReply().getSuccess()) { - return reply; - } - } - } catch (InterruptedIOException iioe) { - throw iioe; - } catch (Exception ioe) { - LOG.warn(this + ": failed to install SnapshotInfo " + snapshot.getFiles(), - ioe); - return null; - } - - if (reply != null) { - follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); - follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); - LOG.info("{}: install snapshot-{} successfully on follower {}", - server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); - } - return reply; - } - - protected SnapshotInfo shouldInstallSnapshot() { - final long logStartIndex = raftLog.getStartIndex(); - // we should install snapshot if the follower needs to catch up and: - // 1. there is no local log entry but there is snapshot - // 2. or the follower's next index is smaller than the log start index - if (follower.getNextIndex() < raftLog.getNextIndex()) { - SnapshotInfo snapshot = server.getState().getLatestSnapshot(); - if (follower.getNextIndex() < logStartIndex || - (logStartIndex == INVALID_LOG_INDEX && snapshot != null)) { - return snapshot; - } - } - return null; - } - - /** Check and send appendEntries RPC */ - private void checkAndSendAppendEntries() - throws InterruptedException, InterruptedIOException { - while (isAppenderRunning()) { - if (shouldSendRequest()) { - SnapshotInfo snapshot = shouldInstallSnapshot(); - if (snapshot != null) { - LOG.info("{}: follower {}'s next index is {}," + - " log's start index is {}, need to install snapshot", - server.getId(), follower.getPeer(), follower.getNextIndex(), - raftLog.getStartIndex()); - - final InstallSnapshotReplyProto r = installSnapshot(snapshot); - if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) { - checkResponseTerm(r.getTerm()); - } // otherwise if r is null, retry the snapshot installation - } else { - final AppendEntriesReplyProto r = sendAppendEntriesWithRetries(); - if (r != null) { - handleReply(r); - } - } - } - if (isAppenderRunning() && !shouldAppendEntries( - follower.getNextIndex() + buffer.getPendingEntryNum())) { - final long waitTime = getHeartbeatRemainingTime( - follower.getLastRpcTime()); - if (waitTime > 0) { - synchronized (this) { - wait(waitTime); - } - } - } - } - } - - private void handleReply(AppendEntriesReplyProto reply) { - if (reply != null) { - switch (reply.getResult()) { - case SUCCESS: - final long oldNextIndex = follower.getNextIndex(); - final long nextIndex = reply.getNextIndex(); - if (nextIndex < oldNextIndex) { - throw new IllegalStateException("nextIndex=" + nextIndex - + " < oldNextIndex=" + oldNextIndex - + ", reply=" + ProtoUtils.toString(reply)); - } - - if (nextIndex > oldNextIndex) { - follower.updateMatchIndex(nextIndex - 1); - follower.updateNextIndex(nextIndex); - submitEventOnSuccessAppend(); - } - break; - case NOT_LEADER: - // check if should step down - checkResponseTerm(reply.getTerm()); - break; - case INCONSISTENCY: - follower.decreaseNextIndex(reply.getNextIndex()); - break; - case UNRECOGNIZED: - LOG.warn("{} received UNRECOGNIZED AppendResult from {}", - server.getId(), follower.getPeer().getId()); - break; - } - } - } - - protected void submitEventOnSuccessAppend() { - LeaderState.StateUpdateEvent e = follower.isAttendingVote() ? - LeaderState.UPDATE_COMMIT_EVENT : - LeaderState.STAGING_PROGRESS_EVENT; - leaderState.submitUpdateStateEvent(e); - } - - public synchronized void notifyAppend() { - this.notify(); - } - - /** Should the leader send appendEntries RPC to this follower? */ - protected boolean shouldSendRequest() { - return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat(); - } - - private boolean shouldAppendEntries(long followerIndex) { - return followerIndex < raftLog.getNextIndex(); - } - - private boolean shouldHeartbeat() { - return getHeartbeatRemainingTime(follower.getLastRpcTime()) <= 0; - } - - /** - * @return the time in milliseconds that the leader should send a heartbeat. - */ - protected long getHeartbeatRemainingTime(Timestamp lastTime) { - return server.getMinTimeoutMs() / 2 - lastTime.elapsedTimeMs(); - } - - protected void checkResponseTerm(long responseTerm) { - synchronized (server) { - if (isAppenderRunning() && follower.isAttendingVote() - && responseTerm > leaderState.getCurrentTerm()) { - leaderState.submitUpdateStateEvent( - new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN, - responseTerm)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java deleted file mode 100644 index d77faff..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -public interface LogAppenderFactory { - LogAppender getLogAppender(RaftServerImpl server, LeaderState state, - FollowerInfo f); - - class SynchronousLogAppenderFactory implements LogAppenderFactory { - @Override - public LogAppender getLogAppender(RaftServerImpl server, LeaderState state, - FollowerInfo f) { - return new LogAppender(server, state, f); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java deleted file mode 100644 index 774a0c5..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.base.Preconditions; -import org.apache.raft.protocol.RaftPeer; - -import java.util.*; - -/** - * The peer configuration of a raft cluster. - * - * The objects of this class are immutable. - */ -class PeerConfiguration { - private final Map<String, RaftPeer> peers; - - PeerConfiguration(Iterable<RaftPeer> peers) { - Preconditions.checkNotNull(peers); - Map<String, RaftPeer> map = new HashMap<>(); - for(RaftPeer p : peers) { - map.put(p.getId(), p); - } - this.peers = Collections.unmodifiableMap(map); - Preconditions.checkState(!this.peers.isEmpty()); - } - - Collection<RaftPeer> getPeers() { - return Collections.unmodifiableCollection(peers.values()); - } - - int size() { - return peers.size(); - } - - @Override - public String toString() { - return peers.values().toString(); - } - - RaftPeer getPeer(String id) { - return peers.get(id); - } - - boolean contains(String id) { - return peers.containsKey(id); - } - - List<RaftPeer> getOtherPeers(String selfId) { - List<RaftPeer> others = new ArrayList<>(); - for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) { - if (!selfId.equals(entry.getValue().getId())) { - others.add(entry.getValue()); - } - } - return others; - } - - boolean hasMajority(Collection<String> others, String selfId) { - Preconditions.checkArgument(!others.contains(selfId)); - int num = 0; - if (contains(selfId)) { - num++; - } - for (String other : others) { - if (contains(other)) { - num++; - } - if (num > size() / 2) { - return true; - } - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java deleted file mode 100644 index 689566a..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.base.Preconditions; -import org.apache.raft.protocol.Message; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.protocol.SetConfigurationRequest; -import org.apache.raft.statemachine.TransactionContext; - -import java.util.concurrent.CompletableFuture; - -public class PendingRequest implements Comparable<PendingRequest> { - private final Long index; - private final RaftClientRequest request; - private final TransactionContext entry; - private final CompletableFuture<RaftClientReply> future; - - PendingRequest(long index, RaftClientRequest request, - TransactionContext entry) { - this.index = index; - this.request = request; - this.entry = entry; - this.future = new CompletableFuture<>(); - } - - PendingRequest(SetConfigurationRequest request) { - this(RaftServerConstants.INVALID_LOG_INDEX, request, null); - } - - long getIndex() { - return index; - } - - RaftClientRequest getRequest() { - return request; - } - - public CompletableFuture<RaftClientReply> getFuture() { - return future; - } - - TransactionContext getEntry() { - return entry; - } - - synchronized void setException(Throwable e) { - Preconditions.checkArgument(e != null); - future.completeExceptionally(e); - } - - synchronized void setReply(RaftClientReply r) { - Preconditions.checkArgument(r != null); - future.complete(r); - } - - void setSuccessReply(Message message) { - setReply(new RaftClientReply(getRequest(), message)); - } - - @Override - public int compareTo(PendingRequest that) { - return Long.compare(this.index, that.index); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "(index=" + index - + ", request=" + request; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java deleted file mode 100644 index 32f127e..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.base.Preconditions; -import org.apache.raft.protocol.*; -import org.apache.raft.statemachine.TransactionContext; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; - -class PendingRequests { - private static final Logger LOG = RaftServerImpl.LOG; - - private PendingRequest pendingSetConf; - private final RaftServerImpl server; - private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>(); - private PendingRequest last = null; - - PendingRequests(RaftServerImpl server) { - this.server = server; - } - - PendingRequest addPendingRequest(long index, RaftClientRequest request, - TransactionContext entry) { - // externally synced for now - Preconditions.checkArgument(!request.isReadOnly()); - Preconditions.checkState(last == null || index == last.getIndex() + 1); - return add(index, request, entry); - } - - private PendingRequest add(long index, RaftClientRequest request, - TransactionContext entry) { - final PendingRequest pending = new PendingRequest(index, request, entry); - pendingRequests.put(index, pending); - last = pending; - return pending; - } - - PendingRequest addConfRequest(SetConfigurationRequest request) { - Preconditions.checkState(pendingSetConf == null); - pendingSetConf = new PendingRequest(request); - return pendingSetConf; - } - - void replySetConfiguration() { - // we allow the pendingRequest to be null in case that the new leader - // commits the new configuration while it has not received the retry - // request from the client - if (pendingSetConf != null) { - // for setConfiguration we do not need to wait for statemachine. send back - // reply after it's committed. - pendingSetConf.setSuccessReply(null); - pendingSetConf = null; - } - } - - void failSetConfiguration(RaftException e) { - Preconditions.checkState(pendingSetConf != null); - pendingSetConf.setException(e); - pendingSetConf = null; - } - - TransactionContext getTransactionContext(long index) { - PendingRequest pendingRequest = pendingRequests.get(index); - // it is possible that the pendingRequest is null if this peer just becomes - // the new leader and commits transactions received by the previous leader - return pendingRequest != null ? pendingRequest.getEntry() : null; - } - - void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) { - final PendingRequest pending = pendingRequests.get(index); - if (pending != null) { - Preconditions.checkState(pending.getIndex() == index); - - messageFuture.whenComplete((reply, exception) -> { - if (exception == null) { - pending.setSuccessReply(reply); - } else { - pending.setException(exception); - } - }); - } - } - - /** - * The leader state is stopped. Send NotLeaderException to all the pending - * requests since they have not got applied to the state machine yet. - */ - void sendNotLeaderResponses() throws IOException { - LOG.info("{} sends responses before shutting down PendingRequestsHandler", - server.getId()); - - Collection<TransactionContext> pendingEntries = pendingRequests.values().stream() - .map(PendingRequest::getEntry).collect(Collectors.toList()); - // notify the state machine about stepping down - server.getStateMachine().notifyNotLeader(pendingEntries); - pendingRequests.values().forEach(this::setNotLeaderException); - if (pendingSetConf != null) { - setNotLeaderException(pendingSetConf); - } - } - - private void setNotLeaderException(PendingRequest pending) { - RaftClientReply reply = new RaftClientReply(pending.getRequest(), - server.generateNotLeaderException()); - pending.setReply(reply); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java deleted file mode 100644 index 4879314..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.raft.protocol.RaftPeer; - -import java.util.*; -import java.util.concurrent.ThreadLocalRandom; - -/** - * The configuration of the raft cluster. - * - * The configuration is stable if there is no on-going peer change. Otherwise, - * the configuration is transitional, i.e. in the middle of a peer change. - * - * The objects of this class are immutable. - */ -public class RaftConfiguration { - /** Create a {@link Builder}. */ - public static Builder newBuilder() { - return new Builder(); - } - - /** To build {@link RaftConfiguration} objects. */ - public static class Builder { - private PeerConfiguration oldConf; - private PeerConfiguration conf; - private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX; - - private boolean forceStable = false; - private boolean forceTransitional = false; - - private Builder() {} - - public Builder setConf(PeerConfiguration conf) { - Preconditions.checkNotNull(conf); - Preconditions.checkState(this.conf == null, "conf is already set."); - this.conf = conf; - return this; - } - - public Builder setConf(Iterable<RaftPeer> peers) { - return setConf(new PeerConfiguration(peers)); - } - - public Builder setConf(RaftPeer[] peers) { - return setConf(Arrays.asList(peers)); - } - - Builder setConf(RaftConfiguration transitionalConf) { - Preconditions.checkNotNull(transitionalConf); - Preconditions.checkState(transitionalConf.isTransitional()); - - Preconditions.checkState(!forceTransitional); - forceStable = true; - return setConf(transitionalConf.conf); - } - - - public Builder setOldConf(PeerConfiguration oldConf) { - Preconditions.checkNotNull(oldConf); - Preconditions.checkState(this.oldConf == null, "oldConf is already set."); - this.oldConf = oldConf; - return this; - } - - public Builder setOldConf(Iterable<RaftPeer> oldPeers) { - return setOldConf(new PeerConfiguration(oldPeers)); - } - - public Builder setOldConf(RaftPeer[] oldPeers) { - return setOldConf(Arrays.asList(oldPeers)); - } - - Builder setOldConf(RaftConfiguration stableConf) { - Preconditions.checkNotNull(stableConf); - Preconditions.checkState(stableConf.isStable()); - - Preconditions.checkState(!forceStable); - forceTransitional = true; - return setOldConf(stableConf.conf); - } - - public Builder setLogEntryIndex(long logEntryIndex) { - Preconditions.checkArgument( - logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX); - Preconditions.checkState( - this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX, - "logEntryIndex is already set."); - this.logEntryIndex = logEntryIndex; - return this; - } - - /** Build a {@link RaftConfiguration}. */ - public RaftConfiguration build() { - if (forceTransitional) { - Preconditions.checkState(oldConf != null); - } - if (forceStable) { - Preconditions.checkState(oldConf == null); - } - return new RaftConfiguration(conf, oldConf, logEntryIndex); - } - } - - /** Non-null only if this configuration is transitional. */ - private final PeerConfiguration oldConf; - /** - * The current peer configuration while this configuration is stable; - * or the new peer configuration while this configuration is transitional. - */ - private final PeerConfiguration conf; - - /** The index of the corresponding log entry for this configuration. */ - private final long logEntryIndex; - - private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf, - long logEntryIndex) { - Preconditions.checkNotNull(conf); - this.conf = conf; - this.oldConf = oldConf; - this.logEntryIndex = logEntryIndex; - } - - /** Is this configuration transitional, i.e. in the middle of a peer change? */ - boolean isTransitional() { - return oldConf != null; - } - - /** Is this configuration stable, i.e. no on-going peer change? */ - boolean isStable() { - return oldConf == null; - } - - boolean containsInConf(String peerId) { - return conf.contains(peerId); - } - - boolean containsInOldConf(String peerId) { - return oldConf != null && oldConf.contains(peerId); - } - - boolean contains(String peerId) { - return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId)); - } - - /** - * @return the peer corresponding to the given id; - * or return null if the peer is not in this configuration. - */ - public RaftPeer getPeer(String id) { - if (id == null) { - return null; - } - RaftPeer peer = conf.getPeer(id); - if (peer != null) { - return peer; - } else if (oldConf != null) { - return oldConf.getPeer(id); - } - return null; - } - - /** @return all the peers from the conf, and the old conf if it exists. */ - public Collection<RaftPeer> getPeers() { - final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers()); - if (oldConf != null) { - oldConf.getPeers().stream().filter(p -> !peers.contains(p)) - .forEach(peers::add); - } - return peers; - } - - /** - * @return all the peers other than the given self id from the conf, - * and the old conf if it exists. - */ - public Collection<RaftPeer> getOtherPeers(String selfId) { - Collection<RaftPeer> others = conf.getOtherPeers(selfId); - if (oldConf != null) { - oldConf.getOtherPeers(selfId).stream() - .filter(p -> !others.contains(p)) - .forEach(others::add); - } - return others; - } - - /** @return true if the self id together with the others are in the majority. */ - boolean hasMajority(Collection<String> others, String selfId) { - Preconditions.checkArgument(!others.contains(selfId)); - return conf.hasMajority(others, selfId) && - (oldConf == null || oldConf.hasMajority(others, selfId)); - } - - @Override - public String toString() { - return conf + (oldConf != null ? "old:" + oldConf : ""); - } - - @VisibleForTesting - boolean hasNoChange(RaftPeer[] newMembers) { - if (!isStable() || conf.size() != newMembers.length) { - return false; - } - for (RaftPeer peer : newMembers) { - if (!conf.contains(peer.getId())) { - return false; - } - } - return true; - } - - long getLogEntryIndex() { - return logEntryIndex; - } - - static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers, - RaftConfiguration old) { - List<RaftPeer> peers = new ArrayList<>(); - for (RaftPeer p : newMembers) { - if (!old.containsInConf(p.getId())) { - peers.add(p); - } - } - return peers; - } - - RaftPeer getRandomPeer(String exclusiveId) { - final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId); - if (peers.isEmpty()) { - return null; - } - final int index = ThreadLocalRandom.current().nextInt(peers.size()); - return peers.get(index); - } - - Collection<RaftPeer> getPeersInOldConf() { - return oldConf != null ? oldConf.getPeers() : Collections.emptyList(); - } - - Collection<RaftPeer> getPeersInConf() { - return conf.getPeers(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java deleted file mode 100644 index 6634152..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import org.apache.raft.client.RaftClient; - -public interface RaftServerConstants { - long INVALID_LOG_INDEX = -1; - byte LOG_TERMINATE_BYTE = 0; - long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM; - - enum StartupOption { - FORMAT("format"), - REGULAR("regular"); - - private final String option; - - StartupOption(String arg) { - this.option = arg; - } - - public static StartupOption getOption(String arg) { - for (StartupOption s : StartupOption.values()) { - if (s.option.equals(arg)) { - return s; - } - } - return REGULAR; - } - } -}
