http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java new file mode 100644 index 0000000..7b3845a --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.Timestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +class LeaderElection extends Daemon { + public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class); + + private ResultAndTerm logAndReturn(Result result, + List<RequestVoteReplyProto> responses, + List<Exception> exceptions, long newTerm) { + LOG.info(server.getId() + ": Election " + result + "; received " + + responses.size() + " response(s) " + + responses.stream().map(r -> ProtoUtils.toString(r)).collect(Collectors.toList()) + + " and " + exceptions.size() + " exception(s); " + server.getState()); + int i = 0; + for(Exception e : exceptions) { + LOG.info(" " + i++ + ": " + e); + LOG.trace("TRACE", e); + } + return new ResultAndTerm(result, newTerm); + } + + enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN} + + private static class ResultAndTerm { + final Result result; + final long term; + + ResultAndTerm(Result result, long term) { + this.result = result; + this.term = term; + } + } + + private final RaftServerImpl server; + private ExecutorCompletionService<RequestVoteReplyProto> service; + private ExecutorService executor; + private volatile boolean running; + /** + * The Raft configuration should not change while the peer is in candidate + * state. If the configuration changes, another peer should be acting as a + * leader and this LeaderElection session should end. + */ + private final RaftConfiguration conf; + private final Collection<RaftPeer> others; + + LeaderElection(RaftServerImpl server) { + this.server = server; + conf = server.getRaftConf(); + others = conf.getOtherPeers(server.getId()); + this.running = true; + } + + void stopRunning() { + this.running = false; + } + + private void initExecutor() { + Preconditions.checkState(!others.isEmpty()); + executor = Executors.newFixedThreadPool(others.size(), + new ThreadFactoryBuilder().setDaemon(true).build()); + service = new ExecutorCompletionService<>(executor); + } + + @Override + public void run() { + try { + askForVotes(); + } catch (InterruptedException e) { + // the leader election thread is interrupted. The peer may already step + // down to a follower. The leader election should skip. + LOG.info(server.getId() + " " + getClass().getSimpleName() + + " thread is interrupted gracefully; server=" + server); + } catch (IOException e) { + LOG.warn("Failed to persist votedFor/term. Exit the leader election.", e); + stopRunning(); + } + } + + /** + * After a peer changes its role to candidate, it invokes this method to + * send out requestVote rpc to all other peers. + */ + private void askForVotes() throws InterruptedException, IOException { + final ServerState state = server.getState(); + while (running && server.isCandidate()) { + // one round of requestVotes + final long electionTerm; + synchronized (server) { + electionTerm = state.initElection(); + server.getState().persistMetadata(); + } + LOG.info(state.getSelfId() + ": begin an election in Term " + + electionTerm); + + TermIndex lastEntry = ServerProtoUtils.toTermIndex( + state.getLog().getLastEntry()); + if (lastEntry == null) { + // lastEntry may need to be derived from snapshot + SnapshotInfo snapshot = state.getLatestSnapshot(); + if (snapshot != null) { + lastEntry = snapshot.getTermIndex(); + } + } + + final ResultAndTerm r; + if (others.isEmpty()) { + r = new ResultAndTerm(Result.PASSED, electionTerm); + } else { + try { + initExecutor(); + int submitted = submitRequests(electionTerm, lastEntry); + r = waitForResults(electionTerm, submitted); + } finally { + if (executor != null) { + executor.shutdown(); + } + } + } + + synchronized (server) { + if (electionTerm != state.getCurrentTerm() || !running || + !server.isCandidate()) { + return; // term already passed or no longer a candidate. + } + + switch (r.result) { + case PASSED: + server.changeToLeader(); + return; + case SHUTDOWN: + LOG.info("{} received shutdown response when requesting votes.", + server.getId()); + server.close(); + return; + case REJECTED: + case DISCOVERED_A_NEW_TERM: + final long term = r.term > server.getState().getCurrentTerm() ? + r.term : server.getState().getCurrentTerm(); + server.changeToFollower(term, true); + return; + case TIMEOUT: + // should start another election + } + } + } + } + + private int submitRequests(final long electionTerm, final TermIndex lastEntry) { + int submitted = 0; + for (final RaftPeer peer : others) { + final RequestVoteRequestProto r = server.createRequestVoteRequest( + peer.getId(), electionTerm, lastEntry); + service.submit( + () -> server.getServerRpc().requestVote(r)); + submitted++; + } + return submitted; + } + + private ResultAndTerm waitForResults(final long electionTerm, + final int submitted) throws InterruptedException { + final Timestamp timeout = new Timestamp().addTimeMs(server.getRandomTimeoutMs()); + final List<RequestVoteReplyProto> responses = new ArrayList<>(); + final List<Exception> exceptions = new ArrayList<>(); + int waitForNum = submitted; + Collection<String> votedPeers = new ArrayList<>(); + while (waitForNum > 0 && running && server.isCandidate()) { + final long waitTime = -timeout.elapsedTimeMs(); + if (waitTime <= 0) { + return logAndReturn(Result.TIMEOUT, responses, exceptions, -1); + } + + try { + final Future<RequestVoteReplyProto> future = service.poll( + waitTime, TimeUnit.MILLISECONDS); + if (future == null) { + continue; // poll timeout, continue to return Result.TIMEOUT + } + + final RequestVoteReplyProto r = future.get(); + responses.add(r); + if (r.getShouldShutdown()) { + return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1); + } + if (r.getTerm() > electionTerm) { + return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses, + exceptions, r.getTerm()); + } + if (r.getServerReply().getSuccess()) { + votedPeers.add(r.getServerReply().getReplyId()); + if (conf.hasMajority(votedPeers, server.getId())) { + return logAndReturn(Result.PASSED, responses, exceptions, -1); + } + } + } catch(ExecutionException e) { + LOG.info("Got exception when requesting votes: " + e); + LOG.trace("TRACE", e); + exceptions.add(e); + } + waitForNum--; + } + // received all the responses + return logAndReturn(Result.REJECTED, responses, exceptions, -1); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java new file mode 100644 index 0000000..e4b2889 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -0,0 +1,601 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY; +import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.STAGINGPROGRESS; +import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.STEPDOWN; +import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.UPDATECOMMIT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.ReconfigurationTimeoutException; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.Timestamp; +import org.slf4j.Logger; + +import com.google.common.base.Preconditions; + +/** + * States for leader only. It contains three different types of processors: + * 1. RPC senders: each thread is appending log to a follower + * 2. EventProcessor: a single thread updating the raft server's state based on + * status of log appending response + * 3. PendingRequestHandler: a handler sending back responses to clients when + * corresponding log entries are committed + */ +public class LeaderState { + private static final Logger LOG = RaftServerImpl.LOG; + + enum StateUpdateEventType { + STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS + } + + enum BootStrapProgress { + NOPROGRESS, PROGRESSING, CAUGHTUP + } + + static class StateUpdateEvent { + final StateUpdateEventType type; + final long newTerm; + + StateUpdateEvent(StateUpdateEventType type, long newTerm) { + this.type = type; + this.newTerm = newTerm; + } + } + + static final StateUpdateEvent UPDATE_COMMIT_EVENT = + new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1); + static final StateUpdateEvent STAGING_PROGRESS_EVENT = + new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1); + + private final RaftServerImpl server; + private final RaftLog raftLog; + private final long currentTerm; + private volatile ConfigurationStagingState stagingState; + private List<List<FollowerInfo>> voterLists; + + /** + * The list of threads appending entries to followers. + * The list is protected by the RaftServer's lock. + */ + private final List<LogAppender> senders; + private final BlockingQueue<StateUpdateEvent> eventQ; + private final EventProcessor processor; + private final PendingRequests pendingRequests; + private volatile boolean running = true; + + private final int stagingCatchupGap; + private final int snapshotChunkMaxSize; + private final int syncInterval; + + LeaderState(RaftServerImpl server, RaftProperties properties) { + this.server = server; + + stagingCatchupGap = properties.getInt( + RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, + RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT); + snapshotChunkMaxSize = properties.getInt( + RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY, + RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT); + syncInterval = properties.getInt( + RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY, + RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT); + + final ServerState state = server.getState(); + this.raftLog = state.getLog(); + this.currentTerm = state.getCurrentTerm(); + eventQ = new ArrayBlockingQueue<>(4096); + processor = new EventProcessor(); + pendingRequests = new PendingRequests(server); + + final RaftConfiguration conf = server.getRaftConf(); + Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId()); + final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); + final long nextIndex = raftLog.getNextIndex(); + senders = new ArrayList<>(others.size()); + for (RaftPeer p : others) { + FollowerInfo f = new FollowerInfo(p, t, nextIndex, true); + senders.add(server.getLogAppenderFactory().getLogAppender(server, this, f)); + } + voterLists = divideFollowers(conf); + } + + void start() { + // In the beginning of the new term, replicate an empty entry in order + // to finally commit entries in the previous term. + // Also this message can help identify the last committed index when + // the leader peer is just started. + final LogEntryProto placeHolder = LogEntryProto.newBuilder() + .setTerm(server.getState().getCurrentTerm()) + .setIndex(raftLog.getNextIndex()) + .setNoOp(LeaderNoOp.newBuilder()).build(); + raftLog.append(placeHolder); + + processor.start(); + startSenders(); + } + + private void startSenders() { + senders.forEach(Thread::start); + } + + void stop() { + this.running = false; + // do not interrupt event processor since it may be in the middle of logSync + for (LogAppender sender : senders) { + sender.stopSender(); + sender.interrupt(); + } + try { + pendingRequests.sendNotLeaderResponses(); + } catch (IOException e) { + LOG.warn("Caught exception in sendNotLeaderResponses", e); + } + } + + void notifySenders() { + senders.forEach(LogAppender::notifyAppend); + } + + boolean inStagingState() { + return stagingState != null; + } + + ConfigurationStagingState getStagingState() { + return stagingState; + } + + long getCurrentTerm() { + return currentTerm; + } + + int getSnapshotChunkMaxSize() { + return snapshotChunkMaxSize; + } + + int getSyncInterval() { + return syncInterval; + } + + /** + * Start bootstrapping new peers + */ + PendingRequest startSetConfiguration(SetConfigurationRequest request) { + Preconditions.checkState(running && !inStagingState()); + + RaftPeer[] peersInNewConf = request.getPeersInNewConf(); + Collection<RaftPeer> peersToBootStrap = RaftConfiguration + .computeNewPeers(peersInNewConf, server.getRaftConf()); + + // add the request to the pending queue + final PendingRequest pending = pendingRequests.addConfRequest(request); + + ConfigurationStagingState stagingState = new ConfigurationStagingState( + peersToBootStrap, new PeerConfiguration(Arrays.asList(peersInNewConf))); + Collection<RaftPeer> newPeers = stagingState.getNewPeers(); + // set the staging state + this.stagingState = stagingState; + + if (newPeers.isEmpty()) { + applyOldNewConf(); + } else { + // update the LeaderState's sender list + addSenders(newPeers); + } + return pending; + } + + PendingRequest addPendingRequest(long index, RaftClientRequest request, + TransactionContext entry) { + return pendingRequests.addPendingRequest(index, request, entry); + } + + private void applyOldNewConf() { + final ServerState state = server.getState(); + final RaftConfiguration current = server.getRaftConf(); + final RaftConfiguration oldNewConf= stagingState.generateOldNewConf(current, + state.getLog().getNextIndex()); + // apply the (old, new) configuration to log, and use it as the current conf + long index = state.getLog().append(state.getCurrentTerm(), oldNewConf); + updateConfiguration(index, oldNewConf); + + this.stagingState = null; + notifySenders(); + } + + private void updateConfiguration(long logIndex, RaftConfiguration newConf) { + voterLists = divideFollowers(newConf); + server.getState().setRaftConf(logIndex, newConf); + } + + /** + * After receiving a setConfiguration request, the leader should update its + * RpcSender list. + */ + void addSenders(Collection<RaftPeer> newMembers) { + final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); + final long nextIndex = raftLog.getNextIndex(); + for (RaftPeer peer : newMembers) { + FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false); + LogAppender sender = server.getLogAppenderFactory() + .getLogAppender(server, this, f); + senders.add(sender); + sender.start(); + } + } + + /** + * Update the RpcSender list based on the current configuration + */ + private void updateSenders(RaftConfiguration conf) { + Preconditions.checkState(conf.isStable() && !inStagingState()); + Iterator<LogAppender> iterator = senders.iterator(); + while (iterator.hasNext()) { + LogAppender sender = iterator.next(); + if (!conf.containsInConf(sender.getFollower().getPeer().getId())) { + iterator.remove(); + sender.stopSender(); + sender.interrupt(); + } + } + } + + void submitUpdateStateEvent(StateUpdateEvent event) { + try { + eventQ.put(event); + } catch (InterruptedException e) { + LOG.info("Interrupted when adding event {} into the queue", event); + } + } + + private void prepare() { + synchronized (server) { + if (running) { + final RaftConfiguration conf = server.getRaftConf(); + if (conf.isTransitional() && server.getState().isConfCommitted()) { + // the configuration is in transitional state, and has been committed + // so it is time to generate and replicate (new) conf. + replicateNewConf(); + } + } + } + } + + /** + * The processor thread takes the responsibility to update the raft server's + * state, such as changing to follower, or updating the committed index. + */ + private class EventProcessor extends Daemon { + @Override + public void run() { + // apply an empty message; check if necessary to replicate (new) conf + prepare(); + + while (running) { + try { + StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(), + TimeUnit.MILLISECONDS); + synchronized (server) { + if (running) { + handleEvent(event); + } + } + // the updated configuration does not need to be sync'ed here + } catch (InterruptedException e) { + final String s = server.getId() + " " + getClass().getSimpleName() + + " thread is interrupted "; + if (!running) { + LOG.info(s + " gracefully; server=" + server); + } else { + LOG.warn(s + " UNEXPECTEDLY; server=" + server, e); + throw new RuntimeException(e); + } + } catch (IOException e) { + LOG.warn("Failed to persist new votedFor/term.", e); + // the failure should happen while changing the state to follower + // thus the in-memory state should have been updated + Preconditions.checkState(!running); + } + } + } + } + + private void handleEvent(StateUpdateEvent e) throws IOException { + if (e == null) { + if (inStagingState()) { + checkNewPeers(); + } + } else { + if (e.type == STEPDOWN) { + server.changeToFollower(e.newTerm, true); + } else if (e.type == UPDATECOMMIT) { + updateLastCommitted(); + } else if (e.type == STAGINGPROGRESS) { + checkNewPeers(); + } + } + } + + /** + * So far we use a simple implementation for catchup checking: + * 1. If the latest rpc time of the remote peer is before 3 * max_timeout, + * the peer made no progress for that long. We should fail the whole + * setConfiguration request. + * 2. If the peer's matching index is just behind for a small gap, and the + * peer was updated recently (within max_timeout), declare the peer as + * caught-up. + * 3. Otherwise the peer is making progressing. Keep waiting. + */ + private BootStrapProgress checkProgress(FollowerInfo follower, + long committed) { + Preconditions.checkArgument(!follower.isAttendingVote()); + final Timestamp progressTime = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); + final Timestamp timeoutTime = new Timestamp().addTimeMs(-3*server.getMaxTimeoutMs()); + if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) { + LOG.debug("{} detects a follower {} timeout for bootstrapping," + + " timeoutTime: {}", server.getId(), follower, timeoutTime); + return BootStrapProgress.NOPROGRESS; + } else if (follower.getMatchIndex() + stagingCatchupGap > committed + && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) { + return BootStrapProgress.CAUGHTUP; + } else { + return BootStrapProgress.PROGRESSING; + } + } + + private Collection<BootStrapProgress> checkAllProgress(long committed) { + Preconditions.checkState(inStagingState()); + return senders.stream() + .filter(sender -> !sender.getFollower().isAttendingVote()) + .map(sender -> checkProgress(sender.getFollower(), committed)) + .collect(Collectors.toCollection(ArrayList::new)); + } + + private void checkNewPeers() { + if (!inStagingState()) { + // it is possible that the bootstrapping is done and we still have + // remaining STAGINGPROGRESS event to handle. + updateLastCommitted(); + } else { + final long committedIndex = server.getState().getLog() + .getLastCommittedIndex(); + Collection<BootStrapProgress> reports = checkAllProgress(committedIndex); + if (reports.contains(BootStrapProgress.NOPROGRESS)) { + LOG.debug("{} fails the setConfiguration request", server.getId()); + stagingState.fail(); + } else if (!reports.contains(BootStrapProgress.PROGRESSING)) { + // all caught up! + applyOldNewConf(); + for (LogAppender sender : senders) { + sender.getFollower().startAttendVote(); + } + } + } + } + + boolean isBootStrappingPeer(String peerId) { + return inStagingState() && getStagingState().contains(peerId); + } + + private void updateLastCommitted() { + final String selfId = server.getId(); + final RaftConfiguration conf = server.getRaftConf(); + long majorityInNewConf = computeLastCommitted(voterLists.get(0), + conf.containsInConf(selfId)); + final long oldLastCommitted = raftLog.getLastCommittedIndex(); + final LogEntryProto[] entriesToCommit; + if (!conf.isTransitional()) { + // copy the entries that may get committed out of the raftlog, to prevent + // the possible race that the log gets purged after the statemachine does + // a snapshot + entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, + Math.max(majorityInNewConf, oldLastCommitted) + 1); + server.getState().updateStatemachine(majorityInNewConf, currentTerm); + } else { // configuration is in transitional state + long majorityInOldConf = computeLastCommitted(voterLists.get(1), + conf.containsInOldConf(selfId)); + final long majority = Math.min(majorityInNewConf, majorityInOldConf); + entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, + Math.max(majority, oldLastCommitted) + 1); + server.getState().updateStatemachine(majority, currentTerm); + } + checkAndUpdateConfiguration(entriesToCommit); + } + + private boolean committedConf(LogEntryProto[] entries) { + final long currentCommitted = raftLog.getLastCommittedIndex(); + for (LogEntryProto entry : entries) { + if (entry.getIndex() <= currentCommitted && + ProtoUtils.isConfigurationLogEntry(entry)) { + return true; + } + } + return false; + } + + private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) { + final RaftConfiguration conf = server.getRaftConf(); + if (committedConf(entriesToCheck)) { + if (conf.isTransitional()) { + replicateNewConf(); + } else { // the (new) log entry has been committed + LOG.debug("{} sends success to setConfiguration request", server.getId()); + pendingRequests.replySetConfiguration(); + // if the leader is not included in the current configuration, step down + if (!conf.containsInConf(server.getId())) { + LOG.info("{} is not included in the new configuration {}. Step down.", + server.getId(), conf); + try { + // leave some time for all RPC senders to send out new conf entry + Thread.sleep(server.getMinTimeoutMs()); + } catch (InterruptedException ignored) { + } + // the pending request handler will send NotLeaderException for + // pending client requests when it stops + server.close(); + } + } + } + } + + /** + * when the (old, new) log entry has been committed, should replicate (new): + * 1) append (new) to log + * 2) update conf to (new) + * 3) update RpcSenders list + * 4) start replicating the log entry + */ + private void replicateNewConf() { + final RaftConfiguration conf = server.getRaftConf(); + final RaftConfiguration newConf = RaftConfiguration.newBuilder() + .setConf(conf) + .setLogEntryIndex(raftLog.getNextIndex()) + .build(); + // stop the LogAppender if the corresponding follower is no longer in the conf + updateSenders(newConf); + long index = raftLog.append(server.getState().getCurrentTerm(), newConf); + updateConfiguration(index, newConf); + notifySenders(); + } + + private long computeLastCommitted(List<FollowerInfo> followers, + boolean includeSelf) { + final int length = includeSelf ? followers.size() + 1 : followers.size(); + final long[] indices = new long[length]; + for (int i = 0; i < followers.size(); i++) { + indices[i] = followers.get(i).getMatchIndex(); + } + if (includeSelf) { + // note that we also need to wait for the local disk I/O + indices[length - 1] = raftLog.getLatestFlushedIndex(); + } + + Arrays.sort(indices); + return indices[(indices.length - 1) / 2]; + } + + private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) { + List<List<FollowerInfo>> lists = new ArrayList<>(2); + List<FollowerInfo> listForNew = senders.stream() + .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId())) + .map(LogAppender::getFollower) + .collect(Collectors.toList()); + lists.add(listForNew); + if (conf.isTransitional()) { + List<FollowerInfo> listForOld = senders.stream() + .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId())) + .map(LogAppender::getFollower) + .collect(Collectors.toList()); + lists.add(listForOld); + } + return lists; + } + + PendingRequest returnNoConfChange(SetConfigurationRequest r) { + PendingRequest pending = new PendingRequest(r); + pending.setSuccessReply(null); + return pending; + } + + void replyPendingRequest(long logIndex, CompletableFuture<Message> message) { + pendingRequests.replyPendingRequest(logIndex, message); + } + + TransactionContext getTransactionContext(long index) { + return pendingRequests.getTransactionContext(index); + } + + private class ConfigurationStagingState { + private final Map<String, RaftPeer> newPeers; + private final PeerConfiguration newConf; + + ConfigurationStagingState(Collection<RaftPeer> newPeers, + PeerConfiguration newConf) { + Map<String, RaftPeer> map = new HashMap<>(); + for (RaftPeer peer : newPeers) { + map.put(peer.getId(), peer); + } + this.newPeers = Collections.unmodifiableMap(map); + this.newConf = newConf; + } + + RaftConfiguration generateOldNewConf(RaftConfiguration current, + long logIndex) { + return RaftConfiguration.newBuilder() + .setConf(newConf) + .setOldConf(current) + .setLogEntryIndex(logIndex) + .build(); + } + + Collection<RaftPeer> getNewPeers() { + return newPeers.values(); + } + + boolean contains(String peerId) { + return newPeers.containsKey(peerId); + } + + void fail() { + Iterator<LogAppender> iterator = senders.iterator(); + while (iterator.hasNext()) { + LogAppender sender = iterator.next(); + if (!sender.getFollower().isAttendingVote()) { + iterator.remove(); + sender.stopSender(); + sender.interrupt(); + } + } + LeaderState.this.stagingState = null; + // send back failure response to client's request + pendingRequests.failSetConfiguration( + new ReconfigurationTimeoutException("Fail to set configuration " + + newConf + ". Timeout when bootstrapping new peers.")); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java new file mode 100644 index 0000000..5599699 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -0,0 +1,494 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY; +import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.UUID; + +import org.apache.commons.io.IOUtils; +import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.Timestamp; +import org.slf4j.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * A daemon thread appending log entries to a follower peer. + */ +public class LogAppender extends Daemon { + public static final Logger LOG = RaftServerImpl.LOG; + + protected final RaftServerImpl server; + private final LeaderState leaderState; + protected final RaftLog raftLog; + protected final FollowerInfo follower; + private final int maxBufferSize; + private final boolean batchSending; + private final LogEntryBuffer buffer; + private final long leaderTerm; + + private volatile boolean sending = true; + + public LogAppender(RaftServerImpl server, LeaderState leaderState, FollowerInfo f) { + this.follower = f; + this.server = server; + this.leaderState = leaderState; + this.raftLog = server.getState().getLog(); + this.maxBufferSize = server.getProperties().getInt( + RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY, + RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT); + this.batchSending = server.getProperties().getBoolean( + RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY, + RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT); + this.buffer = new LogEntryBuffer(); + this.leaderTerm = server.getState().getCurrentTerm(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + server.getId() + " -> " + + follower.getPeer().getId() + ")"; + } + + @Override + public void run() { + try { + checkAndSendAppendEntries(); + } catch (InterruptedException | InterruptedIOException e) { + LOG.info(this + " was interrupted: " + e); + } + } + + protected boolean isAppenderRunning() { + return sending; + } + + public void stopSender() { + this.sending = false; + } + + public FollowerInfo getFollower() { + return follower; + } + + /** + * A buffer for log entries with size limitation. + */ + private class LogEntryBuffer { + private final List<LogEntryProto> buf = new ArrayList<>(); + private int totalSize = 0; + + void addEntry(LogEntryProto entry) { + buf.add(entry); + totalSize += entry.getSerializedSize(); + } + + boolean isFull() { + return totalSize >= maxBufferSize; + } + + boolean isEmpty() { + return buf.isEmpty(); + } + + AppendEntriesRequestProto getAppendRequest(TermIndex previous) { + final AppendEntriesRequestProto request = server + .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(), + previous, buf, !follower.isAttendingVote()); + buf.clear(); + totalSize = 0; + return request; + } + + int getPendingEntryNum() { + return buf.size(); + } + } + + private TermIndex getPrevious() { + TermIndex previous = ServerProtoUtils.toTermIndex( + raftLog.get(follower.getNextIndex() - 1)); + if (previous == null) { + // if previous is null, nextIndex must be equal to the log start + // index (otherwise we will install snapshot). + Preconditions.checkState(follower.getNextIndex() == raftLog.getStartIndex(), + "follower's next index %s, local log start index %s", + follower.getNextIndex(), raftLog.getStartIndex()); + SnapshotInfo snapshot = server.getState().getLatestSnapshot(); + previous = snapshot == null ? null : snapshot.getTermIndex(); + } + return previous; + } + + protected AppendEntriesRequestProto createRequest() { + final TermIndex previous = getPrevious(); + final long leaderNext = raftLog.getNextIndex(); + long next = follower.getNextIndex() + buffer.getPendingEntryNum(); + boolean toSend = false; + + if (leaderNext == next && !buffer.isEmpty()) { + // no new entries, then send out the entries in the buffer + toSend = true; + } else if (leaderNext > next) { + while (leaderNext > next && !buffer.isFull()) { + // stop adding entry once the buffer size is >= the max size + buffer.addEntry(raftLog.get(next++)); + } + if (buffer.isFull() || !batchSending) { + // buffer is full or batch sending is disabled, send out a request + toSend = true; + } + } + + if (toSend || shouldHeartbeat()) { + return buffer.getAppendRequest(previous); + } + return null; + } + + /** Send an appendEntries RPC; retry indefinitely. */ + private AppendEntriesReplyProto sendAppendEntriesWithRetries() + throws InterruptedException, InterruptedIOException { + int retry = 0; + AppendEntriesRequestProto request = null; + while (isAppenderRunning()) { // keep retrying for IOException + try { + if (request == null || request.getEntriesCount() == 0) { + request = createRequest(); + } + + if (request == null) { + LOG.trace("{} need not send AppendEntries now." + + " Wait for more entries.", server.getId()); + return null; + } else if (!isAppenderRunning()) { + LOG.debug("LogAppender {} has been stopped. Skip the request.", this); + return null; + } + + follower.updateLastRpcSendTime(); + final AppendEntriesReplyProto r = server.getServerRpc() + .appendEntries(request); + follower.updateLastRpcResponseTime(); + + return r; + } catch (InterruptedIOException iioe) { + throw iioe; + } catch (IOException ioe) { + LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe); + } + if (isAppenderRunning()) { + Thread.sleep(leaderState.getSyncInterval()); + } + } + return null; + } + + protected class SnapshotRequestIter + implements Iterable<InstallSnapshotRequestProto> { + private final SnapshotInfo snapshot; + private final List<FileInfo> files; + private FileInputStream in; + private int fileIndex = 0; + + private FileInfo currentFileInfo; + private byte[] currentBuf; + private long currentFileSize; + private long currentOffset = 0; + private int chunkIndex = 0; + + private final String requestId; + private int requestIndex = 0; + + public SnapshotRequestIter(SnapshotInfo snapshot, String requestId) + throws IOException { + this.snapshot = snapshot; + this.requestId = requestId; + this.files = snapshot.getFiles(); + if (files.size() > 0) { + startReadFile(); + } + } + + private void startReadFile() throws IOException { + currentFileInfo = files.get(fileIndex); + File snapshotFile = currentFileInfo.getPath().toFile(); + currentFileSize = snapshotFile.length(); + final int bufLength = + (int) Math.min(leaderState.getSnapshotChunkMaxSize(), currentFileSize); + currentBuf = new byte[bufLength]; + currentOffset = 0; + chunkIndex = 0; + in = new FileInputStream(snapshotFile); + } + + @Override + public Iterator<InstallSnapshotRequestProto> iterator() { + return new Iterator<InstallSnapshotRequestProto>() { + @Override + public boolean hasNext() { + return fileIndex < files.size(); + } + + @Override + public InstallSnapshotRequestProto next() { + if (fileIndex >= files.size()) { + throw new NoSuchElementException(); + } + int targetLength = (int) Math.min(currentFileSize - currentOffset, + leaderState.getSnapshotChunkMaxSize()); + FileChunkProto chunk; + try { + chunk = readFileChunk(currentFileInfo, in, currentBuf, + targetLength, currentOffset, chunkIndex); + boolean done = (fileIndex == files.size() - 1) && + chunk.getDone(); + InstallSnapshotRequestProto request = + server.createInstallSnapshotRequest(follower.getPeer().getId(), + requestId, requestIndex++, snapshot, + Lists.newArrayList(chunk), done); + currentOffset += targetLength; + chunkIndex++; + + if (currentOffset >= currentFileSize) { + in.close(); + fileIndex++; + if (fileIndex < files.size()) { + startReadFile(); + } + } + + return request; + } catch (IOException e) { + if (in != null) { + try { + in.close(); + } catch (IOException ignored) { + } + } + LOG.warn("Got exception when preparing InstallSnapshot request", e); + throw new RuntimeException(e); + } + } + }; + } + } + + private FileChunkProto readFileChunk(FileInfo fileInfo, + FileInputStream in, byte[] buf, int length, long offset, int chunkIndex) + throws IOException { + FileChunkProto.Builder builder = FileChunkProto.newBuilder() + .setOffset(offset).setChunkIndex(chunkIndex); + IOUtils.readFully(in, buf, 0, length); + Path relativePath = server.getState().getStorage().getStorageDir() + .relativizeToRoot(fileInfo.getPath()); + builder.setFilename(relativePath.toString()); + builder.setDone(offset + length == fileInfo.getFileSize()); + builder.setFileDigest( + ByteString.copyFrom(fileInfo.getFileDigest().getDigest())); + builder.setData(ByteString.copyFrom(buf, 0, length)); + return builder.build(); + } + + private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) + throws InterruptedException, InterruptedIOException { + String requestId = UUID.randomUUID().toString(); + InstallSnapshotReplyProto reply = null; + try { + for (InstallSnapshotRequestProto request : + new SnapshotRequestIter(snapshot, requestId)) { + follower.updateLastRpcSendTime(); + reply = server.getServerRpc().installSnapshot(request); + follower.updateLastRpcResponseTime(); + + if (!reply.getServerReply().getSuccess()) { + return reply; + } + } + } catch (InterruptedIOException iioe) { + throw iioe; + } catch (Exception ioe) { + LOG.warn(this + ": failed to install SnapshotInfo " + snapshot.getFiles(), + ioe); + return null; + } + + if (reply != null) { + follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); + follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); + LOG.info("{}: install snapshot-{} successfully on follower {}", + server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); + } + return reply; + } + + protected SnapshotInfo shouldInstallSnapshot() { + final long logStartIndex = raftLog.getStartIndex(); + // we should install snapshot if the follower needs to catch up and: + // 1. there is no local log entry but there is snapshot + // 2. or the follower's next index is smaller than the log start index + if (follower.getNextIndex() < raftLog.getNextIndex()) { + SnapshotInfo snapshot = server.getState().getLatestSnapshot(); + if (follower.getNextIndex() < logStartIndex || + (logStartIndex == INVALID_LOG_INDEX && snapshot != null)) { + return snapshot; + } + } + return null; + } + + /** Check and send appendEntries RPC */ + private void checkAndSendAppendEntries() + throws InterruptedException, InterruptedIOException { + while (isAppenderRunning()) { + if (shouldSendRequest()) { + SnapshotInfo snapshot = shouldInstallSnapshot(); + if (snapshot != null) { + LOG.info("{}: follower {}'s next index is {}," + + " log's start index is {}, need to install snapshot", + server.getId(), follower.getPeer(), follower.getNextIndex(), + raftLog.getStartIndex()); + + final InstallSnapshotReplyProto r = installSnapshot(snapshot); + if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) { + checkResponseTerm(r.getTerm()); + } // otherwise if r is null, retry the snapshot installation + } else { + final AppendEntriesReplyProto r = sendAppendEntriesWithRetries(); + if (r != null) { + handleReply(r); + } + } + } + if (isAppenderRunning() && !shouldAppendEntries( + follower.getNextIndex() + buffer.getPendingEntryNum())) { + final long waitTime = getHeartbeatRemainingTime( + follower.getLastRpcTime()); + if (waitTime > 0) { + synchronized (this) { + wait(waitTime); + } + } + } + } + } + + private void handleReply(AppendEntriesReplyProto reply) { + if (reply != null) { + switch (reply.getResult()) { + case SUCCESS: + final long oldNextIndex = follower.getNextIndex(); + final long nextIndex = reply.getNextIndex(); + if (nextIndex < oldNextIndex) { + throw new IllegalStateException("nextIndex=" + nextIndex + + " < oldNextIndex=" + oldNextIndex + + ", reply=" + ProtoUtils.toString(reply)); + } + + if (nextIndex > oldNextIndex) { + follower.updateMatchIndex(nextIndex - 1); + follower.updateNextIndex(nextIndex); + submitEventOnSuccessAppend(); + } + break; + case NOT_LEADER: + // check if should step down + checkResponseTerm(reply.getTerm()); + break; + case INCONSISTENCY: + follower.decreaseNextIndex(reply.getNextIndex()); + break; + case UNRECOGNIZED: + LOG.warn("{} received UNRECOGNIZED AppendResult from {}", + server.getId(), follower.getPeer().getId()); + break; + } + } + } + + protected void submitEventOnSuccessAppend() { + LeaderState.StateUpdateEvent e = follower.isAttendingVote() ? + LeaderState.UPDATE_COMMIT_EVENT : + LeaderState.STAGING_PROGRESS_EVENT; + leaderState.submitUpdateStateEvent(e); + } + + public synchronized void notifyAppend() { + this.notify(); + } + + /** Should the leader send appendEntries RPC to this follower? */ + protected boolean shouldSendRequest() { + return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat(); + } + + private boolean shouldAppendEntries(long followerIndex) { + return followerIndex < raftLog.getNextIndex(); + } + + private boolean shouldHeartbeat() { + return getHeartbeatRemainingTime(follower.getLastRpcTime()) <= 0; + } + + /** + * @return the time in milliseconds that the leader should send a heartbeat. + */ + protected long getHeartbeatRemainingTime(Timestamp lastTime) { + return server.getMinTimeoutMs() / 2 - lastTime.elapsedTimeMs(); + } + + protected void checkResponseTerm(long responseTerm) { + synchronized (server) { + if (isAppenderRunning() && follower.isAttendingVote() + && responseTerm > leaderState.getCurrentTerm()) { + leaderState.submitUpdateStateEvent( + new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN, + responseTerm)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java new file mode 100644 index 0000000..e6cc213 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +public interface LogAppenderFactory { + LogAppender getLogAppender(RaftServerImpl server, LeaderState state, + FollowerInfo f); + + class SynchronousLogAppenderFactory implements LogAppenderFactory { + @Override + public LogAppender getLogAppender(RaftServerImpl server, LeaderState state, + FollowerInfo f) { + return new LogAppender(server, state, f); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java new file mode 100644 index 0000000..b532303 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import com.google.common.base.Preconditions; + +import java.util.*; + +import org.apache.ratis.protocol.RaftPeer; + +/** + * The peer configuration of a raft cluster. + * + * The objects of this class are immutable. + */ +class PeerConfiguration { + private final Map<String, RaftPeer> peers; + + PeerConfiguration(Iterable<RaftPeer> peers) { + Preconditions.checkNotNull(peers); + Map<String, RaftPeer> map = new HashMap<>(); + for(RaftPeer p : peers) { + map.put(p.getId(), p); + } + this.peers = Collections.unmodifiableMap(map); + Preconditions.checkState(!this.peers.isEmpty()); + } + + Collection<RaftPeer> getPeers() { + return Collections.unmodifiableCollection(peers.values()); + } + + int size() { + return peers.size(); + } + + @Override + public String toString() { + return peers.values().toString(); + } + + RaftPeer getPeer(String id) { + return peers.get(id); + } + + boolean contains(String id) { + return peers.containsKey(id); + } + + List<RaftPeer> getOtherPeers(String selfId) { + List<RaftPeer> others = new ArrayList<>(); + for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) { + if (!selfId.equals(entry.getValue().getId())) { + others.add(entry.getValue()); + } + } + return others; + } + + boolean hasMajority(Collection<String> others, String selfId) { + Preconditions.checkArgument(!others.contains(selfId)); + int num = 0; + if (contains(selfId)) { + num++; + } + for (String other : others) { + if (contains(other)) { + num++; + } + if (num > size() / 2) { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java new file mode 100644 index 0000000..bf47cdc --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.statemachine.TransactionContext; + +import java.util.concurrent.CompletableFuture; + +public class PendingRequest implements Comparable<PendingRequest> { + private final Long index; + private final RaftClientRequest request; + private final TransactionContext entry; + private final CompletableFuture<RaftClientReply> future; + + PendingRequest(long index, RaftClientRequest request, + TransactionContext entry) { + this.index = index; + this.request = request; + this.entry = entry; + this.future = new CompletableFuture<>(); + } + + PendingRequest(SetConfigurationRequest request) { + this(RaftServerConstants.INVALID_LOG_INDEX, request, null); + } + + long getIndex() { + return index; + } + + RaftClientRequest getRequest() { + return request; + } + + public CompletableFuture<RaftClientReply> getFuture() { + return future; + } + + TransactionContext getEntry() { + return entry; + } + + synchronized void setException(Throwable e) { + Preconditions.checkArgument(e != null); + future.completeExceptionally(e); + } + + synchronized void setReply(RaftClientReply r) { + Preconditions.checkArgument(r != null); + future.complete(r); + } + + void setSuccessReply(Message message) { + setReply(new RaftClientReply(getRequest(), message)); + } + + @Override + public int compareTo(PendingRequest that) { + return Long.compare(this.index, that.index); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(index=" + index + + ", request=" + request; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java new file mode 100644 index 0000000..6343344 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.protocol.*; +import org.apache.ratis.statemachine.TransactionContext; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +class PendingRequests { + private static final Logger LOG = RaftServerImpl.LOG; + + private PendingRequest pendingSetConf; + private final RaftServerImpl server; + private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>(); + private PendingRequest last = null; + + PendingRequests(RaftServerImpl server) { + this.server = server; + } + + PendingRequest addPendingRequest(long index, RaftClientRequest request, + TransactionContext entry) { + // externally synced for now + Preconditions.checkArgument(!request.isReadOnly()); + Preconditions.checkState(last == null || index == last.getIndex() + 1); + return add(index, request, entry); + } + + private PendingRequest add(long index, RaftClientRequest request, + TransactionContext entry) { + final PendingRequest pending = new PendingRequest(index, request, entry); + pendingRequests.put(index, pending); + last = pending; + return pending; + } + + PendingRequest addConfRequest(SetConfigurationRequest request) { + Preconditions.checkState(pendingSetConf == null); + pendingSetConf = new PendingRequest(request); + return pendingSetConf; + } + + void replySetConfiguration() { + // we allow the pendingRequest to be null in case that the new leader + // commits the new configuration while it has not received the retry + // request from the client + if (pendingSetConf != null) { + // for setConfiguration we do not need to wait for statemachine. send back + // reply after it's committed. + pendingSetConf.setSuccessReply(null); + pendingSetConf = null; + } + } + + void failSetConfiguration(RaftException e) { + Preconditions.checkState(pendingSetConf != null); + pendingSetConf.setException(e); + pendingSetConf = null; + } + + TransactionContext getTransactionContext(long index) { + PendingRequest pendingRequest = pendingRequests.get(index); + // it is possible that the pendingRequest is null if this peer just becomes + // the new leader and commits transactions received by the previous leader + return pendingRequest != null ? pendingRequest.getEntry() : null; + } + + void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) { + final PendingRequest pending = pendingRequests.get(index); + if (pending != null) { + Preconditions.checkState(pending.getIndex() == index); + + messageFuture.whenComplete((reply, exception) -> { + if (exception == null) { + pending.setSuccessReply(reply); + } else { + pending.setException(exception); + } + }); + } + } + + /** + * The leader state is stopped. Send NotLeaderException to all the pending + * requests since they have not got applied to the state machine yet. + */ + void sendNotLeaderResponses() throws IOException { + LOG.info("{} sends responses before shutting down PendingRequestsHandler", + server.getId()); + + Collection<TransactionContext> pendingEntries = pendingRequests.values().stream() + .map(PendingRequest::getEntry).collect(Collectors.toList()); + // notify the state machine about stepping down + server.getStateMachine().notifyNotLeader(pendingEntries); + pendingRequests.values().forEach(this::setNotLeaderException); + if (pendingSetConf != null) { + setNotLeaderException(pendingSetConf); + } + } + + private void setNotLeaderException(PendingRequest pending) { + RaftClientReply reply = new RaftClientReply(pending.getRequest(), + server.generateNotLeaderException()); + pending.setReply(reply); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java new file mode 100644 index 0000000..8fdd628 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.ratis.protocol.RaftPeer; + +/** + * The configuration of the raft cluster. + * + * The configuration is stable if there is no on-going peer change. Otherwise, + * the configuration is transitional, i.e. in the middle of a peer change. + * + * The objects of this class are immutable. + */ +public class RaftConfiguration { + /** Create a {@link Builder}. */ + public static Builder newBuilder() { + return new Builder(); + } + + /** To build {@link RaftConfiguration} objects. */ + public static class Builder { + private PeerConfiguration oldConf; + private PeerConfiguration conf; + private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX; + + private boolean forceStable = false; + private boolean forceTransitional = false; + + private Builder() {} + + public Builder setConf(PeerConfiguration conf) { + Preconditions.checkNotNull(conf); + Preconditions.checkState(this.conf == null, "conf is already set."); + this.conf = conf; + return this; + } + + public Builder setConf(Iterable<RaftPeer> peers) { + return setConf(new PeerConfiguration(peers)); + } + + public Builder setConf(RaftPeer[] peers) { + return setConf(Arrays.asList(peers)); + } + + Builder setConf(RaftConfiguration transitionalConf) { + Preconditions.checkNotNull(transitionalConf); + Preconditions.checkState(transitionalConf.isTransitional()); + + Preconditions.checkState(!forceTransitional); + forceStable = true; + return setConf(transitionalConf.conf); + } + + + public Builder setOldConf(PeerConfiguration oldConf) { + Preconditions.checkNotNull(oldConf); + Preconditions.checkState(this.oldConf == null, "oldConf is already set."); + this.oldConf = oldConf; + return this; + } + + public Builder setOldConf(Iterable<RaftPeer> oldPeers) { + return setOldConf(new PeerConfiguration(oldPeers)); + } + + public Builder setOldConf(RaftPeer[] oldPeers) { + return setOldConf(Arrays.asList(oldPeers)); + } + + Builder setOldConf(RaftConfiguration stableConf) { + Preconditions.checkNotNull(stableConf); + Preconditions.checkState(stableConf.isStable()); + + Preconditions.checkState(!forceStable); + forceTransitional = true; + return setOldConf(stableConf.conf); + } + + public Builder setLogEntryIndex(long logEntryIndex) { + Preconditions.checkArgument( + logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX); + Preconditions.checkState( + this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX, + "logEntryIndex is already set."); + this.logEntryIndex = logEntryIndex; + return this; + } + + /** Build a {@link RaftConfiguration}. */ + public RaftConfiguration build() { + if (forceTransitional) { + Preconditions.checkState(oldConf != null); + } + if (forceStable) { + Preconditions.checkState(oldConf == null); + } + return new RaftConfiguration(conf, oldConf, logEntryIndex); + } + } + + /** Non-null only if this configuration is transitional. */ + private final PeerConfiguration oldConf; + /** + * The current peer configuration while this configuration is stable; + * or the new peer configuration while this configuration is transitional. + */ + private final PeerConfiguration conf; + + /** The index of the corresponding log entry for this configuration. */ + private final long logEntryIndex; + + private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf, + long logEntryIndex) { + Preconditions.checkNotNull(conf); + this.conf = conf; + this.oldConf = oldConf; + this.logEntryIndex = logEntryIndex; + } + + /** Is this configuration transitional, i.e. in the middle of a peer change? */ + boolean isTransitional() { + return oldConf != null; + } + + /** Is this configuration stable, i.e. no on-going peer change? */ + boolean isStable() { + return oldConf == null; + } + + boolean containsInConf(String peerId) { + return conf.contains(peerId); + } + + boolean containsInOldConf(String peerId) { + return oldConf != null && oldConf.contains(peerId); + } + + boolean contains(String peerId) { + return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId)); + } + + /** + * @return the peer corresponding to the given id; + * or return null if the peer is not in this configuration. + */ + public RaftPeer getPeer(String id) { + if (id == null) { + return null; + } + RaftPeer peer = conf.getPeer(id); + if (peer != null) { + return peer; + } else if (oldConf != null) { + return oldConf.getPeer(id); + } + return null; + } + + /** @return all the peers from the conf, and the old conf if it exists. */ + public Collection<RaftPeer> getPeers() { + final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers()); + if (oldConf != null) { + oldConf.getPeers().stream().filter(p -> !peers.contains(p)) + .forEach(peers::add); + } + return peers; + } + + /** + * @return all the peers other than the given self id from the conf, + * and the old conf if it exists. + */ + public Collection<RaftPeer> getOtherPeers(String selfId) { + Collection<RaftPeer> others = conf.getOtherPeers(selfId); + if (oldConf != null) { + oldConf.getOtherPeers(selfId).stream() + .filter(p -> !others.contains(p)) + .forEach(others::add); + } + return others; + } + + /** @return true if the self id together with the others are in the majority. */ + boolean hasMajority(Collection<String> others, String selfId) { + Preconditions.checkArgument(!others.contains(selfId)); + return conf.hasMajority(others, selfId) && + (oldConf == null || oldConf.hasMajority(others, selfId)); + } + + @Override + public String toString() { + return conf + (oldConf != null ? "old:" + oldConf : ""); + } + + @VisibleForTesting + boolean hasNoChange(RaftPeer[] newMembers) { + if (!isStable() || conf.size() != newMembers.length) { + return false; + } + for (RaftPeer peer : newMembers) { + if (!conf.contains(peer.getId())) { + return false; + } + } + return true; + } + + long getLogEntryIndex() { + return logEntryIndex; + } + + static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers, + RaftConfiguration old) { + List<RaftPeer> peers = new ArrayList<>(); + for (RaftPeer p : newMembers) { + if (!old.containsInConf(p.getId())) { + peers.add(p); + } + } + return peers; + } + + RaftPeer getRandomPeer(String exclusiveId) { + final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId); + if (peers.isEmpty()) { + return null; + } + final int index = ThreadLocalRandom.current().nextInt(peers.size()); + return peers.get(index); + } + + Collection<RaftPeer> getPeersInOldConf() { + return oldConf != null ? oldConf.getPeers() : Collections.emptyList(); + } + + Collection<RaftPeer> getPeersInConf() { + return conf.getPeers(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java new file mode 100644 index 0000000..caf9c4d --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.client.RaftClient; + +public interface RaftServerConstants { + long INVALID_LOG_INDEX = -1; + byte LOG_TERMINATE_BYTE = 0; + long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM; + + enum StartupOption { + FORMAT("format"), + REGULAR("regular"); + + private final String option; + + StartupOption(String arg) { + this.option = arg; + } + + public static StartupOption getOption(String arg) { + for (StartupOption s : StartupOption.values()) { + if (s.option.equals(arg)) { + return s; + } + } + return REGULAR; + } + } +}
