http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java new file mode 100644 index 0000000..ad6ecef --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.base.Preconditions; +import org.apache.raft.protocol.*; +import org.apache.raft.statemachine.TransactionContext; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +class PendingRequests { + private static final Logger LOG = RaftServer.LOG; + + private PendingRequest pendingSetConf; + private final RaftServer server; + private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>(); + private PendingRequest last = null; + + PendingRequests(RaftServer server) { + this.server = server; + } + + PendingRequest addPendingRequest(long index, RaftClientRequest request, + TransactionContext entry) { + // externally synced for now + Preconditions.checkArgument(!request.isReadOnly()); + Preconditions.checkState(last == null || index == last.getIndex() + 1); + return add(index, request, entry); + } + + private PendingRequest add(long index, RaftClientRequest request, + TransactionContext entry) { + final PendingRequest pending = new PendingRequest(index, request, entry); + pendingRequests.put(index, pending); + last = pending; + return pending; + } + + PendingRequest addConfRequest(SetConfigurationRequest request) { + Preconditions.checkState(pendingSetConf == null); + pendingSetConf = new PendingRequest(request); + return pendingSetConf; + } + + void replySetConfiguration() { + // we allow the pendingRequest to be null in case that the new leader + // commits the new configuration while it has not received the retry + // request from the client + if (pendingSetConf != null) { + // for setConfiguration we do not need to wait for statemachine. send back + // reply after it's committed. + pendingSetConf.setSuccessReply(null); + pendingSetConf = null; + } + } + + void failSetConfiguration(RaftException e) { + Preconditions.checkState(pendingSetConf != null); + pendingSetConf.setException(e); + pendingSetConf = null; + } + + TransactionContext getTransactionContext(long index) { + PendingRequest pendingRequest = pendingRequests.get(index); + // it is possible that the pendingRequest is null if this peer just becomes + // the new leader and commits transactions received by the previous leader + return pendingRequest != null ? pendingRequest.getEntry() : null; + } + + void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) { + final PendingRequest pending = pendingRequests.get(index); + if (pending != null) { + Preconditions.checkState(pending.getIndex() == index); + + messageFuture.whenComplete((reply, exception) -> { + if (exception == null) { + pending.setSuccessReply(reply); + } else { + pending.setException(exception); + } + }); + } + } + + /** + * The leader state is stopped. Send NotLeaderException to all the pending + * requests since they have not got applied to the state machine yet. + */ + void sendNotLeaderResponses() throws IOException { + LOG.info("{} sends responses before shutting down PendingRequestsHandler", + server.getId()); + + Collection<TransactionContext> pendingEntries = pendingRequests.values().stream() + .map(PendingRequest::getEntry).collect(Collectors.toList()); + // notify the state machine about stepping down + server.getStateMachine().notifyNotLeader(pendingEntries); + pendingRequests.values().forEach(this::setNotLeaderException); + if (pendingSetConf != null) { + setNotLeaderException(pendingSetConf); + } + } + + private void setNotLeaderException(PendingRequest pending) { + RaftClientReply reply = new RaftClientReply(pending.getRequest(), + server.generateNotLeaderException()); + pending.setReply(reply); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java new file mode 100644 index 0000000..28ff330 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftConfiguration.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.raft.protocol.RaftPeer; + +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; + +/** + * The configuration of the raft cluster. + * + * The configuration is stable if there is no on-going peer change. Otherwise, + * the configuration is transitional, i.e. in the middle of a peer change. + * + * The objects of this class are immutable. + */ +public class RaftConfiguration { + /** Create a {@link Builder}. */ + public static Builder newBuilder() { + return new Builder(); + } + + /** To build {@link RaftConfiguration} objects. */ + public static class Builder { + private PeerConfiguration oldConf; + private PeerConfiguration conf; + private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX; + + private boolean forceStable = false; + private boolean forceTransitional = false; + + private Builder() {} + + public Builder setConf(PeerConfiguration conf) { + Preconditions.checkNotNull(conf); + Preconditions.checkState(this.conf == null, "conf is already set."); + this.conf = conf; + return this; + } + + public Builder setConf(Iterable<RaftPeer> peers) { + return setConf(new PeerConfiguration(peers)); + } + + public Builder setConf(RaftPeer[] peers) { + return setConf(Arrays.asList(peers)); + } + + Builder setConf(RaftConfiguration transitionalConf) { + Preconditions.checkNotNull(transitionalConf); + Preconditions.checkState(transitionalConf.isTransitional()); + + Preconditions.checkState(!forceTransitional); + forceStable = true; + return setConf(transitionalConf.conf); + } + + + public Builder setOldConf(PeerConfiguration oldConf) { + Preconditions.checkNotNull(oldConf); + Preconditions.checkState(this.oldConf == null, "oldConf is already set."); + this.oldConf = oldConf; + return this; + } + + public Builder setOldConf(Iterable<RaftPeer> oldPeers) { + return setOldConf(new PeerConfiguration(oldPeers)); + } + + public Builder setOldConf(RaftPeer[] oldPeers) { + return setOldConf(Arrays.asList(oldPeers)); + } + + Builder setOldConf(RaftConfiguration stableConf) { + Preconditions.checkNotNull(stableConf); + Preconditions.checkState(stableConf.isStable()); + + Preconditions.checkState(!forceStable); + forceTransitional = true; + return setOldConf(stableConf.conf); + } + + public Builder setLogEntryIndex(long logEntryIndex) { + Preconditions.checkArgument( + logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX); + Preconditions.checkState( + this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX, + "logEntryIndex is already set."); + this.logEntryIndex = logEntryIndex; + return this; + } + + /** Build a {@link RaftConfiguration}. */ + public RaftConfiguration build() { + if (forceTransitional) { + Preconditions.checkState(oldConf != null); + } + if (forceStable) { + Preconditions.checkState(oldConf == null); + } + return new RaftConfiguration(conf, oldConf, logEntryIndex); + } + } + + /** Non-null only if this configuration is transitional. */ + private final PeerConfiguration oldConf; + /** + * The current peer configuration while this configuration is stable; + * or the new peer configuration while this configuration is transitional. + */ + private final PeerConfiguration conf; + + /** The index of the corresponding log entry for this configuration. */ + private final long logEntryIndex; + + private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf, + long logEntryIndex) { + Preconditions.checkNotNull(conf); + this.conf = conf; + this.oldConf = oldConf; + this.logEntryIndex = logEntryIndex; + } + + /** Is this configuration transitional, i.e. in the middle of a peer change? */ + public boolean isTransitional() { + return oldConf != null; + } + + /** Is this configuration stable, i.e. no on-going peer change? */ + public boolean isStable() { + return oldConf == null; + } + + boolean containsInConf(String peerId) { + return conf.contains(peerId); + } + + boolean containsInOldConf(String peerId) { + return oldConf != null && oldConf.contains(peerId); + } + + public boolean contains(String peerId) { + return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId)); + } + + /** + * @return the peer corresponding to the given id; + * or return null if the peer is not in this configuration. + */ + public RaftPeer getPeer(String id) { + if (id == null) { + return null; + } + RaftPeer peer = conf.getPeer(id); + if (peer != null) { + return peer; + } else if (oldConf != null) { + return oldConf.getPeer(id); + } + return null; + } + + /** @return all the peers from the conf, and the old conf if it exists. */ + public Collection<RaftPeer> getPeers() { + final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers()); + if (oldConf != null) { + oldConf.getPeers().stream().filter(p -> !peers.contains(p)) + .forEach(peers::add); + } + return peers; + } + + /** + * @return all the peers other than the given self id from the conf, + * and the old conf if it exists. + */ + public Collection<RaftPeer> getOtherPeers(String selfId) { + Collection<RaftPeer> others = conf.getOtherPeers(selfId); + if (oldConf != null) { + oldConf.getOtherPeers(selfId).stream() + .filter(p -> !others.contains(p)) + .forEach(others::add); + } + return others; + } + + /** @return true if the self id together with the others are in the majority. */ + public boolean hasMajority(Collection<String> others, String selfId) { + Preconditions.checkArgument(!others.contains(selfId)); + return conf.hasMajority(others, selfId) && + (oldConf == null || oldConf.hasMajority(others, selfId)); + } + + @Override + public String toString() { + return conf + (oldConf != null ? "old:" + oldConf : ""); + } + + @VisibleForTesting + boolean hasNoChange(RaftPeer[] newMembers) { + if (!isStable() || conf.size() != newMembers.length) { + return false; + } + for (RaftPeer peer : newMembers) { + if (!conf.contains(peer.getId())) { + return false; + } + } + return true; + } + + long getLogEntryIndex() { + return logEntryIndex; + } + + static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers, + RaftConfiguration old) { + List<RaftPeer> peers = new ArrayList<>(); + for (RaftPeer p : newMembers) { + if (!old.containsInConf(p.getId())) { + peers.add(p); + } + } + return peers; + } + + RaftPeer getRandomPeer(String exclusiveId) { + final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId); + if (peers.isEmpty()) { + return null; + } + final int index = ThreadLocalRandom.current().nextInt(peers.size()); + return peers.get(index); + } + + public Collection<RaftPeer> getPeersInOldConf() { + return oldConf != null ? oldConf.getPeers() : Collections.emptyList(); + } + + public Collection<RaftPeer> getPeersInConf() { + return conf.getPeers(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java new file mode 100644 index 0000000..c1bf4a9 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java @@ -0,0 +1,749 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.raft.conf.RaftProperties; +import org.apache.raft.protocol.*; +import org.apache.raft.server.RaftServerConfigKeys; +import org.apache.raft.server.protocol.RaftServerProtocol; +import org.apache.raft.server.protocol.TermIndex; +import org.apache.raft.server.storage.FileInfo; +import org.apache.raft.shaded.proto.RaftProtos.*; +import org.apache.raft.statemachine.SnapshotInfo; +import org.apache.raft.statemachine.StateMachine; +import org.apache.raft.statemachine.TransactionContext; +import org.apache.raft.util.CodeInjectionForTesting; +import org.apache.raft.util.LifeCycle; +import org.apache.raft.util.ProtoUtils; +import org.apache.raft.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; + +import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; +import static org.apache.raft.util.LifeCycle.State.*; + +public class RaftServer implements RaftServerProtocol, Closeable { + public static final Logger LOG = LoggerFactory.getLogger(RaftServer.class); + + private static final String CLASS_NAME = RaftServer.class.getSimpleName(); + static final String REQUEST_VOTE = CLASS_NAME + ".requestVote"; + static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; + static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; + + + private final int minTimeoutMs; + private final int maxTimeoutMs; + + private final LifeCycle lifeCycle; + private final ServerState state; + private final StateMachine stateMachine; + private final RaftProperties properties; + private volatile Role role; + + /** used when the peer is follower, to monitor election timeout */ + private volatile FollowerState heartbeatMonitor; + + /** used when the peer is candidate, to request votes from other peers */ + private volatile LeaderElection electionDaemon; + + /** used when the peer is leader */ + private volatile LeaderState leaderState; + + private RaftServerRpc serverRpc; + + private final LogAppenderFactory appenderFactory; + + public RaftServer(String id, RaftConfiguration raftConf, + RaftProperties properties, StateMachine stateMachine) throws IOException { + this.lifeCycle = new LifeCycle(id); + minTimeoutMs = properties.getInt( + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY, + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); + maxTimeoutMs = properties.getInt( + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); + Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs, + "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); + this.properties = properties; + this.stateMachine = stateMachine; + this.state = new ServerState(id, raftConf, properties, this, stateMachine); + appenderFactory = initAppenderFactory(); + } + + public int getMinTimeoutMs() { + return minTimeoutMs; + } + + public int getMaxTimeoutMs() { + return maxTimeoutMs; + } + + public int getRandomTimeoutMs() { + return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs); + } + + public StateMachine getStateMachine() { + return this.stateMachine; + } + + public LogAppenderFactory getLogAppenderFactory() { + return appenderFactory; + } + + private LogAppenderFactory initAppenderFactory() { + Class<? extends LogAppenderFactory> factoryClass = properties.getClass( + RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, + RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT, + LogAppenderFactory.class); + return RaftUtils.newInstance(factoryClass); + } + + /** + * Used by tests to set initial raft configuration with correct port bindings. + */ + @VisibleForTesting + public void setInitialConf(RaftConfiguration conf) { + this.state.setInitialConf(conf); + } + + public void setServerRpc(RaftServerRpc serverRpc) { + this.serverRpc = serverRpc; + // add peers into rpc service + RaftConfiguration conf = getRaftConf(); + if (conf != null) { + addPeersToRPC(conf.getPeers()); + } + } + + public RaftServerRpc getServerRpc() { + return serverRpc; + } + + public void start() { + lifeCycle.transition(STARTING); + state.start(); + RaftConfiguration conf = getRaftConf(); + if (conf != null && conf.contains(getId())) { + LOG.debug("{} starts as a follower", getId()); + startAsFollower(); + } else { + LOG.debug("{} starts with initializing state", getId()); + startInitializing(); + } + } + + /** + * The peer belongs to the current configuration, should start as a follower + */ + private void startAsFollower() { + role = Role.FOLLOWER; + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + + serverRpc.start(); + lifeCycle.transition(RUNNING); + } + + /** + * The peer does not have any configuration (maybe it will later be included + * in some configuration). Start still as a follower but will not vote or + * start election. + */ + private void startInitializing() { + role = Role.FOLLOWER; + // do not start heartbeatMonitoring + serverRpc.start(); + } + + public ServerState getState() { + return this.state; + } + + public String getId() { + return getState().getSelfId(); + } + + public RaftConfiguration getRaftConf() { + return getState().getRaftConf(); + } + + @Override + public void close() { + lifeCycle.checkStateAndClose(() -> { + try { + shutdownHeartbeatMonitor(); + shutdownElectionDaemon(); + shutdownLeaderState(); + + serverRpc.shutdown(); + state.close(); + } catch (Exception ignored) { + LOG.warn("Failed to kill " + state.getSelfId(), ignored); + } + }); + } + + public boolean isAlive() { + return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED); + } + + public boolean isFollower() { + return role == Role.FOLLOWER; + } + + public boolean isCandidate() { + return role == Role.CANDIDATE; + } + + public boolean isLeader() { + return role == Role.LEADER; + } + + Role getRole() { + return role; + } + + /** + * Change the server state to Follower if necessary + * @param newTerm The new term. + * @param sync We will call {@link ServerState#persistMetadata()} if this is + * set to true and term/votedFor get updated. + * @return if the term/votedFor should be updated to the new term + * @throws IOException if term/votedFor persistence failed. + */ + synchronized boolean changeToFollower(long newTerm, boolean sync) + throws IOException { + final Role old = role; + role = Role.FOLLOWER; + + boolean metadataUpdated = false; + if (newTerm > state.getCurrentTerm()) { + state.setCurrentTerm(newTerm); + state.resetLeaderAndVotedFor(); + metadataUpdated = true; + } + + if (old == Role.LEADER) { + assert leaderState != null; + shutdownLeaderState(); + } else if (old == Role.CANDIDATE) { + shutdownElectionDaemon(); + } + + if (old != Role.FOLLOWER) { + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + } + + if (metadataUpdated && sync) { + state.persistMetadata(); + } + return metadataUpdated; + } + + private synchronized void shutdownLeaderState() { + final LeaderState leader = leaderState; + if (leader != null) { + leader.stop(); + } + leaderState = null; + // TODO: make sure that StateMachineUpdater has applied all transactions that have context + } + + private void shutdownElectionDaemon() { + final LeaderElection election = electionDaemon; + if (election != null) { + election.stopRunning(); + // no need to interrupt the election thread + } + electionDaemon = null; + } + + synchronized void changeToLeader() { + Preconditions.checkState(isCandidate()); + shutdownElectionDaemon(); + role = Role.LEADER; + state.becomeLeader(); + // start sending AppendEntries RPC to followers + leaderState = new LeaderState(this, properties); + leaderState.start(); + } + + private void shutdownHeartbeatMonitor() { + final FollowerState hm = heartbeatMonitor; + if (hm != null) { + hm.stopRunning(); + hm.interrupt(); + } + heartbeatMonitor = null; + } + + synchronized void changeToCandidate() { + Preconditions.checkState(isFollower()); + shutdownHeartbeatMonitor(); + role = Role.CANDIDATE; + // start election + electionDaemon = new LeaderElection(this); + electionDaemon.start(); + } + + @Override + public String toString() { + return role + " " + state + " " + lifeCycle.getCurrentState(); + } + + /** + * @return null if the server is in leader state. + */ + CompletableFuture<RaftClientReply> checkLeaderState( + RaftClientRequest request) { + if (!isLeader()) { + NotLeaderException exception = generateNotLeaderException(); + CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); + future.complete(new RaftClientReply(request, exception)); + return future; + } + return null; + } + + NotLeaderException generateNotLeaderException() { + if (lifeCycle.getCurrentState() != RUNNING) { + return new NotLeaderException(getId(), null, null); + } + String leaderId = state.getLeaderId(); + if (leaderId == null || leaderId.equals(state.getSelfId())) { + // No idea about who is the current leader. Or the peer is the current + // leader, but it is about to step down + RaftPeer suggestedLeader = state.getRaftConf() + .getRandomPeer(state.getSelfId()); + leaderId = suggestedLeader == null ? null : suggestedLeader.getId(); + } + RaftConfiguration conf = getRaftConf(); + Collection<RaftPeer> peers = conf.getPeers(); + return new NotLeaderException(getId(), conf.getPeer(leaderId), + peers.toArray(new RaftPeer[peers.size()])); + } + + /** + * Handle a normal update request from client. + */ + public CompletableFuture<RaftClientReply> appendTransaction( + RaftClientRequest request, TransactionContext entry) + throws RaftException { + LOG.debug("{}: receive client request({})", getId(), request); + lifeCycle.assertCurrentState(RUNNING); + CompletableFuture<RaftClientReply> reply; + + final PendingRequest pending; + synchronized (this) { + reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + // append the message to its local log + final long entryIndex; + try { + entryIndex = state.applyLog(entry); + } catch (IOException e) { + throw new RaftException(e); + } + + // put the request into the pending queue + pending = leaderState.addPendingRequest(entryIndex, request, entry); + leaderState.notifySenders(); + } + return pending.getFuture(); + } + + /** + * Handle a raft configuration change request from client. + */ + public CompletableFuture<RaftClientReply> setConfiguration( + SetConfigurationRequest request) throws IOException { + LOG.debug("{}: receive setConfiguration({})", getId(), request); + lifeCycle.assertCurrentState(RUNNING); + CompletableFuture<RaftClientReply> reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + final RaftPeer[] peersInNewConf = request.getPeersInNewConf(); + final PendingRequest pending; + synchronized (this) { + reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + final RaftConfiguration current = getRaftConf(); + // make sure there is no other raft reconfiguration in progress + if (!current.isStable() || leaderState.inStagingState() || + !state.isCurrentConfCommitted()) { + throw new ReconfigurationInProgressException( + "Reconfiguration is already in progress: " + current); + } + + // return true if the new configuration is the same with the current one + if (current.hasNoChange(peersInNewConf)) { + pending = leaderState.returnNoConfChange(request); + return pending.getFuture(); + } + + // add new peers into the rpc service + addPeersToRPC(Arrays.asList(peersInNewConf)); + // add staging state into the leaderState + pending = leaderState.startSetConfiguration(request); + } + return pending.getFuture(); + } + + private boolean shouldWithholdVotes() { + return isLeader() || (isFollower() && state.hasLeader() + && heartbeatMonitor.shouldWithholdVotes()); + } + + /** + * check if the remote peer is not included in the current conf + * and should shutdown. should shutdown if all the following stands: + * 1. this is a leader + * 2. current conf is stable and has been committed + * 3. candidate id is not included in conf + * 4. candidate's last entry's index < conf's index + */ + private boolean shouldSendShutdown(String candidateId, + TermIndex candidateLastEntry) { + return isLeader() + && getRaftConf().isStable() + && getState().isConfCommitted() + && !getRaftConf().containsInConf(candidateId) + && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex() + && !leaderState.isBootStrappingPeer(candidateId); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) + throws IOException { + final String candidateId = r.getServerRequest().getRequestorId(); + return requestVote(candidateId, r.getCandidateTerm(), + ServerProtoUtils.toTermIndex(r.getCandidateLastEntry())); + } + + private RequestVoteReplyProto requestVote(String candidateId, + long candidateTerm, TermIndex candidateLastEntry) throws IOException { + CodeInjectionForTesting.execute(REQUEST_VOTE, getId(), + candidateId, candidateTerm, candidateLastEntry); + LOG.debug("{}: receive requestVote({}, {}, {})", + getId(), candidateId, candidateTerm, candidateLastEntry); + lifeCycle.assertCurrentState(RUNNING); + + boolean voteGranted = false; + boolean shouldShutdown = false; + final RequestVoteReplyProto reply; + synchronized (this) { + if (shouldWithholdVotes()) { + LOG.info("{} Withhold vote from server {} with term {}. " + + "This server:{}, last rpc time from leader {} is {}", getId(), + candidateId, candidateTerm, this, this.getState().getLeaderId(), + (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1)); + } else if (state.recognizeCandidate(candidateId, candidateTerm)) { + boolean termUpdated = changeToFollower(candidateTerm, false); + // see Section 5.4.1 Election restriction + if (state.isLogUpToDate(candidateLastEntry)) { + heartbeatMonitor.updateLastRpcTime(false); + state.grantVote(candidateId); + voteGranted = true; + } + if (termUpdated || voteGranted) { + state.persistMetadata(); // sync metafile + } + } + if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) { + shouldShutdown = true; + } + reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(), + voteGranted, state.getCurrentTerm(), shouldShutdown); + if (LOG.isDebugEnabled()) { + LOG.debug("{} replies to vote request: {}. Peer's state: {}", + getId(), ProtoUtils.toString(reply), state); + } + } + return reply; + } + + private void validateEntries(long expectedTerm, TermIndex previous, + LogEntryProto... entries) { + if (entries != null && entries.length > 0) { + final long index0 = entries[0].getIndex(); + + if (previous == null || previous.getTerm() == 0) { + Preconditions.checkArgument(index0 == 0, + "Unexpected Index: previous is null but entries[%s].getIndex()=%s", + 0, index0); + } else { + Preconditions.checkArgument(previous.getIndex() == index0 - 1, + "Unexpected Index: previous is %s but entries[%s].getIndex()=%s", + previous, 0, index0); + } + + for (int i = 0; i < entries.length; i++) { + final long t = entries[i].getTerm(); + Preconditions.checkArgument(expectedTerm >= t, + "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s", + i, t, expectedTerm); + + final long indexi = entries[i].getIndex(); + Preconditions.checkArgument(indexi == index0 + i, + "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s", + i, indexi, index0); + } + } + } + + @Override + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) + throws IOException { + // TODO avoid converting list to array + final LogEntryProto[] entries = r.getEntriesList() + .toArray(new LogEntryProto[r.getEntriesCount()]); + final TermIndex previous = r.hasPreviousLog() ? + ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; + return appendEntries(r.getServerRequest().getRequestorId(), + r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(), + entries); + } + + private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm, + TermIndex previous, long leaderCommit, boolean initializing, + LogEntryProto... entries) throws IOException { + CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), + leaderId, leaderTerm, previous, leaderCommit, initializing, entries); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(), + leaderId, leaderTerm, previous, leaderCommit, initializing, + ServerProtoUtils.toString(entries)); + } + lifeCycle.assertCurrentState(STARTING, RUNNING); + + try { + validateEntries(leaderTerm, previous, entries); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } + + final long currentTerm; + long nextIndex = state.getLog().getNextIndex(); + synchronized (this) { + final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); + currentTerm = state.getCurrentTerm(); + if (!recognized) { + final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( + leaderId, getId(), currentTerm, nextIndex, NOT_LEADER); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: do not recognize leader. Reply: {}", + getId(), ProtoUtils.toString(reply)); + } + return reply; + } + changeToFollower(leaderTerm, true); + state.setLeader(leaderId); + + if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + } + if (lifeCycle.getCurrentState() == RUNNING) { + heartbeatMonitor.updateLastRpcTime(true); + } + + // We need to check if "previous" is in the local peer. Note that it is + // possible that "previous" is covered by the latest snapshot: e.g., + // it's possible there's no log entries outside of the latest snapshot. + // However, it is not possible that "previous" index is smaller than the + // last index included in snapshot. This is because indices <= snapshot's + // last index should have been committed. + if (previous != null && !containPrevious(previous)) { + final AppendEntriesReplyProto reply = + ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), + currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY); + LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", + getId(), previous, ServerProtoUtils.toString(reply)); + return reply; + } + + state.getLog().append(entries); + state.updateConfiguration(entries); + state.updateStatemachine(leaderCommit, currentTerm); + } + if (entries != null && entries.length > 0) { + try { + state.getLog().logSync(); + } catch (InterruptedException e) { + throw new InterruptedIOException("logSync got interrupted"); + } + nextIndex = entries[entries.length - 1].getIndex() + 1; + } + synchronized (this) { + if (lifeCycle.getCurrentState() == RUNNING && isFollower() + && getState().getCurrentTerm() == currentTerm) { + // reset election timer to avoid punishing the leader for our own + // long disk writes + heartbeatMonitor.updateLastRpcTime(false); + } + } + final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( + leaderId, getId(), currentTerm, nextIndex, SUCCESS); + LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(), + ServerProtoUtils.toString(reply)); + return reply; + } + + private boolean containPrevious(TermIndex previous) { + LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}", + getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot()); + return state.getLog().contains(previous) + || (state.getLatestSnapshot() != null + && state.getLatestSnapshot().getTermIndex().equals(previous)) + || (state.getLatestInstalledSnapshot() != null) + && state.getLatestInstalledSnapshot().equals(previous); + } + + @Override + public InstallSnapshotReplyProto installSnapshot( + InstallSnapshotRequestProto request) throws IOException { + final String leaderId = request.getServerRequest().getRequestorId(); + CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request); + LOG.debug("{}: receive installSnapshot({})", getId(), request); + + lifeCycle.assertCurrentState(STARTING, RUNNING); + + final long currentTerm; + final long leaderTerm = request.getLeaderTerm(); + final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex( + request.getTermIndex()); + final long lastIncludedIndex = lastTermIndex.getIndex(); + synchronized (this) { + final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); + currentTerm = state.getCurrentTerm(); + if (!recognized) { + final InstallSnapshotReplyProto reply = ServerProtoUtils + .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm, + request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); + LOG.debug("{}: do not recognize leader for installing snapshot." + + " Reply: {}", getId(), reply); + return reply; + } + changeToFollower(leaderTerm, true); + state.setLeader(leaderId); + + if (lifeCycle.getCurrentState() == RUNNING) { + heartbeatMonitor.updateLastRpcTime(true); + } + + // Check and append the snapshot chunk. We simply put this in lock + // considering a follower peer requiring a snapshot installation does not + // have a lot of requests + Preconditions.checkState( + state.getLog().getNextIndex() <= lastIncludedIndex, + "%s log's next id is %s, last included index in snapshot is %s", + getId(), state.getLog().getNextIndex(), lastIncludedIndex); + + //TODO: We should only update State with installed snapshot once the request is done. + state.installSnapshot(request); + + // update the committed index + // re-load the state machine if this is the last chunk + if (request.getDone()) { + state.reloadStateMachine(lastIncludedIndex, leaderTerm); + } + if (lifeCycle.getCurrentState() == RUNNING) { + heartbeatMonitor.updateLastRpcTime(false); + } + } + if (request.getDone()) { + LOG.info("{}: successfully install the whole snapshot-{}", getId(), + lastIncludedIndex); + } + return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), + currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS); + } + + AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm, + String targetId, TermIndex previous, List<LogEntryProto> entries, + boolean initializing) { + return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, + leaderTerm, entries, state.getLog().getLastCommittedIndex(), + initializing, previous); + } + + synchronized InstallSnapshotRequestProto createInstallSnapshotRequest( + String targetId, String requestId, int requestIndex, SnapshotInfo snapshot, + List<FileChunkProto> chunks, boolean done) { + OptionalLong totalSize = snapshot.getFiles().stream() + .mapToLong(FileInfo::getFileSize).reduce(Long::sum); + assert totalSize.isPresent(); + return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId, + requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(), + chunks, totalSize.getAsLong(), done); + } + + synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId, + long term, TermIndex lastEntry) { + return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term, + lastEntry); + } + + public synchronized void submitLocalSyncEvent() { + if (isLeader() && leaderState != null) { + leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT); + } + } + + public void addPeersToRPC(Iterable<RaftPeer> peers) { + serverRpc.addPeers(peers); + } + + synchronized void replyPendingRequest(long logIndex, + CompletableFuture<Message> message) { + if (isLeader() && leaderState != null) { // is leader and is running + leaderState.replyPendingRequest(logIndex, message); + } + } + + TransactionContext getTransactionContext(long index) { + if (leaderState != null) { // is leader and is running + return leaderState.getTransactionContext(index); + } + return null; + } + + public RaftProperties getProperties() { + return this.properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java new file mode 100644 index 0000000..6634152 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerConstants.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import org.apache.raft.client.RaftClient; + +public interface RaftServerConstants { + long INVALID_LOG_INDEX = -1; + byte LOG_TERMINATE_BYTE = 0; + long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM; + + enum StartupOption { + FORMAT("format"), + REGULAR("regular"); + + private final String option; + + StartupOption(String arg) { + this.option = arg; + } + + public static StartupOption getOption(String arg) { + for (StartupOption s : StartupOption.values()) { + if (s.option.equals(arg)) { + return s; + } + } + return REGULAR; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java new file mode 100644 index 0000000..fb74a0e --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import org.apache.raft.protocol.RaftPeer; +import org.apache.raft.shaded.proto.RaftProtos.*; + +import java.io.IOException; +import java.net.InetSocketAddress; + +public interface RaftServerRpc { + void start(); + + void shutdown(); + + InetSocketAddress getInetSocketAddress(); + + AppendEntriesReplyProto sendAppendEntries( + AppendEntriesRequestProto request) throws IOException; + + InstallSnapshotReplyProto sendInstallSnapshot( + InstallSnapshotRequestProto request) throws IOException; + + RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request) + throws IOException; + + /** add information of the given peers */ + void addPeers(Iterable<RaftPeer> peers); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java new file mode 100644 index 0000000..cc8651d --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import org.apache.raft.protocol.*; +import org.apache.raft.server.protocol.RaftServerProtocol; +import org.apache.raft.shaded.proto.RaftProtos.*; +import org.apache.raft.statemachine.StateMachine; +import org.apache.raft.statemachine.TransactionContext; +import org.apache.raft.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Each RPC request is first handled by the RequestDispatcher: + * 1. A request from another RaftPeer is to be handled by RaftServer. + * + * If the raft peer is the leader, then: + * + * 2. A read-only request from client is to be handled by the state machine. + * 3. A write request from client is first validated by the state machine. The + * state machine returns the content of the raft log entry, which is then passed + * to the RaftServer for replication. + */ +public class RequestDispatcher implements RaftClientProtocol, RaftServerProtocol { + static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class); + + private final RaftServer server; + private final StateMachine stateMachine; + + public RequestDispatcher(RaftServer server) { + this.server = server; + this.stateMachine = server.getStateMachine(); + } + + public CompletableFuture<RaftClientReply> handleClientRequest( + RaftClientRequest request) throws IOException { + // first check the server's leader state + CompletableFuture<RaftClientReply> reply = server.checkLeaderState(request); + if (reply != null) { + return reply; + } + + // let the state machine handle read-only request from client + if (request.isReadOnly()) { + // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper, + // section 8 (last part) + return stateMachine.query(request); + } + + // TODO: this client request will not be added to pending requests + // until later which means that any failure in between will leave partial state in the + // state machine. We should call cancelTransaction() for failed requests + TransactionContext entry = stateMachine.startTransaction(request); + if (entry.getException().isPresent()) { + throw RaftUtils.asIOException(entry.getException().get()); + } + + return server.appendTransaction(request, entry); + } + + @Override + public RaftClientReply submitClientRequest(RaftClientRequest request) + throws IOException { + return waitForReply(server.getId(), request, handleClientRequest(request)); + } + + public CompletableFuture<RaftClientReply> setConfigurationAsync( + SetConfigurationRequest request) throws IOException { + return server.setConfiguration(request); + } + + @Override + public RaftClientReply setConfiguration(SetConfigurationRequest request) + throws IOException { + return waitForReply(server.getId(), request, setConfigurationAsync(request)); + } + + private static RaftClientReply waitForReply(String serverId, + RaftClientRequest request, CompletableFuture<RaftClientReply> future) + throws IOException { + try { + return future.get(); + } catch (InterruptedException e) { + final String s = serverId + ": Interrupted when waiting for reply, request=" + request; + LOG.info(s, e); + throw RaftUtils.toInterruptedIOException(s, e); + } catch (ExecutionException e) { + final Throwable cause = e.getCause(); + if (cause == null) { + throw new IOException(e); + } + if (cause instanceof NotLeaderException) { + return new RaftClientReply(request, (NotLeaderException)cause); + } else { + throw RaftUtils.asIOException(cause); + } + } + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) + throws IOException { + return server.requestVote(request); + } + + @Override + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) + throws IOException { + return server.appendEntries(request); + } + + @Override + public InstallSnapshotReplyProto installSnapshot( + InstallSnapshotRequestProto request) throws IOException { + return server.installSnapshot(request); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/Role.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/Role.java b/raft-server/src/main/java/org/apache/raft/server/impl/Role.java new file mode 100644 index 0000000..1413961 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/Role.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +/** + * Role of Raft peer + */ +public enum Role { + LEADER, CANDIDATE, FOLLOWER +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java index 21fd719..4594666 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java @@ -18,7 +18,6 @@ package org.apache.raft.server.impl; import org.apache.raft.client.impl.ClientProtoUtils; -import org.apache.raft.server.RaftConfiguration; import org.apache.raft.server.protocol.TermIndex; import org.apache.raft.shaded.proto.RaftProtos.*; import org.apache.raft.util.ProtoUtils; @@ -27,7 +26,7 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import static org.apache.raft.server.RaftServerConstants.DEFAULT_SEQNUM; +import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM; import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; public class ServerProtoUtils { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java new file mode 100644 index 0000000..6680175 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.raft.conf.RaftProperties; +import org.apache.raft.server.protocol.TermIndex; +import org.apache.raft.server.storage.*; +import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.raft.statemachine.SnapshotInfo; +import org.apache.raft.statemachine.StateMachine; +import org.apache.raft.statemachine.TransactionContext; +import org.apache.raft.util.ProtoUtils; + +import java.io.Closeable; +import java.io.IOException; + +import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_DEFAULT; +import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY; + +/** + * Common states of a raft peer. Protected by RaftServer's lock. + */ +public class ServerState implements Closeable { + private final String selfId; + private final RaftServer server; + /** Raft log */ + private final RaftLog log; + /** Raft configuration */ + private final ConfigurationManager configurationManager; + /** The thread that applies committed log entries to the state machine */ + private final StateMachineUpdater stateMachineUpdater; + /** local storage for log and snapshot */ + private final RaftStorage storage; + private final SnapshotManager snapshotManager; + + /** + * Latest term server has seen. initialized to 0 on first boot, increases + * monotonically. + */ + private long currentTerm; + /** + * The server ID of the leader for this term. Null means either there is + * no leader for this term yet or this server does not know who it is yet. + */ + private String leaderId; + /** + * Candidate that this peer granted vote for in current term (or null if none) + */ + private String votedFor; + + /** + * Latest installed snapshot for this server. This maybe different than StateMachine's latest + * snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately. + * Further, this will not get updated when SM does snapshots itself. + */ + private TermIndex latestInstalledSnapshot; + + ServerState(String id, RaftConfiguration conf, RaftProperties prop, + RaftServer server, StateMachine stateMachine) throws IOException { + this.selfId = id; + this.server = server; + configurationManager = new ConfigurationManager(conf); + storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR); + snapshotManager = new SnapshotManager(storage, id); + + long lastApplied = initStatemachine(stateMachine, prop); + + leaderId = null; + log = initLog(id, prop, server, lastApplied); + RaftLog.Metadata metadata = log.loadMetadata(); + currentTerm = metadata.getTerm(); + votedFor = metadata.getVotedFor(); + + stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log, + lastApplied, prop); + } + + /** + * Used by tests to set initial raft configuration with correct port bindings. + */ + @VisibleForTesting + public void setInitialConf(RaftConfiguration initialConf) { + configurationManager.setInitialConf(initialConf); + } + + private long initStatemachine(StateMachine sm, RaftProperties properties) + throws IOException { + sm.initialize(selfId, properties, storage); + storage.setStateMachineStorage(sm.getStateMachineStorage()); + SnapshotInfo snapshot = sm.getLatestSnapshot(); + + if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) { + return RaftServerConstants.INVALID_LOG_INDEX; + } + + // get the raft configuration from the snapshot + RaftConfiguration raftConf = sm.getRaftConfiguration(); + if (raftConf != null) { + configurationManager.addConfiguration(raftConf.getLogEntryIndex(), + raftConf); + } + return snapshot.getIndex(); + } + + void start() { + stateMachineUpdater.start(); + } + + /** + * note we do not apply log entries to the state machine here since we do not + * know whether they have been committed. + */ + private RaftLog initLog(String id, RaftProperties prop, RaftServer server, + long lastIndexInSnapshot) throws IOException { + final RaftLog log; + if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY, + RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) { + log = new MemoryRaftLog(id); + } else { + log = new SegmentedRaftLog(id, server, this.storage, + lastIndexInSnapshot, prop); + } + log.open(configurationManager, lastIndexInSnapshot); + return log; + } + + public RaftConfiguration getRaftConf() { + return configurationManager.getCurrent(); + } + + @VisibleForTesting + + public String getSelfId() { + return this.selfId; + } + + public long getCurrentTerm() { + return currentTerm; + } + + void setCurrentTerm(long term) { + currentTerm = term; + } + + String getLeaderId() { + return leaderId; + } + + boolean hasLeader() { + return leaderId != null; + } + + /** + * Become a candidate and start leader election + */ + long initElection() { + votedFor = selfId; + leaderId = null; + return ++currentTerm; + } + + void persistMetadata() throws IOException { + this.log.writeMetadata(currentTerm, votedFor); + } + + void resetLeaderAndVotedFor() { + votedFor = null; + leaderId = null; + } + + /** + * Vote for a candidate and update the local state. + */ + void grantVote(String candidateId) { + votedFor = candidateId; + leaderId = null; + } + + void setLeader(String leaderId) { + this.leaderId = leaderId; + } + + void becomeLeader() { + leaderId = selfId; + } + + public RaftLog getLog() { + return log; + } + + long applyLog(TransactionContext operation) throws IOException { + return log.append(currentTerm, operation); + } + + /** + * Check if accept the leader selfId and term from the incoming AppendEntries rpc. + * If accept, update the current state. + * @return true if the check passes + */ + boolean recognizeLeader(String leaderId, long leaderTerm) { + if (leaderTerm < currentTerm) { + return false; + } else if (leaderTerm > currentTerm || this.leaderId == null) { + // If the request indicates a term that is greater than the current term + // or no leader has been set for the current term, make sure to update + // leader and term later + return true; + } + Preconditions.checkArgument(this.leaderId.equals(leaderId), + "selfId:%s, this.leaderId:%s, received leaderId:%s", + selfId, this.leaderId, leaderId); + return true; + } + + /** + * Check if the candidate's term is acceptable + */ + boolean recognizeCandidate(String candidateId, + long candidateTerm) { + if (candidateTerm > currentTerm) { + return true; + } else if (candidateTerm == currentTerm) { + // has not voted yet or this is a retry + return votedFor == null || votedFor.equals(candidateId); + } + return false; + } + + boolean isLogUpToDate(TermIndex candidateLastEntry) { + LogEntryProto lastEntry = log.getLastEntry(); + // need to take into account snapshot + SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot(); + if (lastEntry == null && snapshot == null) { + return true; + } else if (candidateLastEntry == null) { + return false; + } + TermIndex local = ServerProtoUtils.toTermIndex(lastEntry); + if (local == null || (snapshot != null && snapshot.getIndex() > lastEntry.getIndex())) { + local = snapshot.getTermIndex(); + } + return local.compareTo(candidateLastEntry) <= 0; + } + + @Override + public String toString() { + return selfId + ":t" + currentTerm + ", leader=" + leaderId + + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + getRaftConf(); + } + + boolean isConfCommitted() { + return getLog().getLastCommittedIndex() >= + getRaftConf().getLogEntryIndex(); + } + + public void setRaftConf(long logIndex, RaftConfiguration conf) { + configurationManager.addConfiguration(logIndex, conf); + RaftServer.LOG.info("{}: successfully update the configuration {}", + getSelfId(), conf); + } + + void updateConfiguration(LogEntryProto[] entries) { + if (entries != null && entries.length > 0) { + configurationManager.removeConfigurations(entries[0].getIndex()); + for (LogEntryProto entry : entries) { + if (ProtoUtils.isConfigurationLogEntry(entry)) { + final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration( + entry.getIndex(), entry.getConfigurationEntry()); + configurationManager.addConfiguration(entry.getIndex(), conf); + server.addPeersToRPC(conf.getPeers()); + } + } + } + } + + void updateStatemachine(long majorityIndex, long currentTerm) { + log.updateLastCommitted(majorityIndex, currentTerm); + stateMachineUpdater.notifyUpdater(); + } + + void reloadStateMachine(long lastIndexInSnapshot, long currentTerm) + throws IOException { + log.updateLastCommitted(lastIndexInSnapshot, currentTerm); + + stateMachineUpdater.reloadStateMachine(); + } + + @Override + public void close() throws IOException { + stateMachineUpdater.stop(); + RaftServer.LOG.info("{} closes. The last applied log index is {}", + getSelfId(), getLastAppliedIndex()); + storage.close(); + } + + @VisibleForTesting + public RaftStorage getStorage() { + return storage; + } + + void installSnapshot(InstallSnapshotRequestProto request) throws IOException { + // TODO: verify that we need to install the snapshot + StateMachine sm = server.getStateMachine(); + sm.pause(); // pause the SM to prepare for install snapshot + snapshotManager.installSnapshot(sm, request); + log.syncWithSnapshot(request.getTermIndex().getIndex()); + this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex( + request.getTermIndex()); + } + + SnapshotInfo getLatestSnapshot() { + return server.getStateMachine().getStateMachineStorage().getLatestSnapshot(); + } + + public TermIndex getLatestInstalledSnapshot() { + return latestInstalledSnapshot; + } + + @VisibleForTesting + public long getLastAppliedIndex() { + return stateMachineUpdater.getLastAppliedIndex(); + } + + boolean isCurrentConfCommitted() { + return getRaftConf().getLogEntryIndex() <= getLog().getLastCommittedIndex(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java new file mode 100644 index 0000000..06fa221 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.base.Preconditions; +import org.apache.raft.conf.RaftProperties; +import org.apache.raft.protocol.Message; +import org.apache.raft.server.RaftServerConfigKeys; +import org.apache.raft.server.storage.RaftLog; +import org.apache.raft.server.storage.RaftStorage; +import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.raft.statemachine.SnapshotInfo; +import org.apache.raft.statemachine.StateMachine; +import org.apache.raft.statemachine.TransactionContext; +import org.apache.raft.util.Daemon; +import org.apache.raft.util.ExitUtils; +import org.apache.raft.util.LifeCycle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; +import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; + +/** + * This class tracks the log entries that have been committed in a quorum and + * applies them to the state machine. We let a separate thread do this work + * asynchronously so that this will not block normal raft protocol. + * + * If the auto log compaction is enabled, the state machine updater thread will + * trigger a snapshot of the state machine by calling + * {@link StateMachine#takeSnapshot} when the log size exceeds a limit. + */ +class StateMachineUpdater implements Runnable { + static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class); + + enum State { + RUNNING, STOP, RELOAD + } + + private final RaftProperties properties; + private final StateMachine stateMachine; + private final RaftServer server; + private final RaftLog raftLog; + + private volatile long lastAppliedIndex; + + private final boolean autoSnapshotEnabled; + private final long snapshotThreshold; + private long lastSnapshotIndex; + + private final Thread updater; + private volatile State state = State.RUNNING; + + StateMachineUpdater(StateMachine stateMachine, RaftServer server, + RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) { + this.properties = properties; + this.stateMachine = stateMachine; + this.server = server; + this.raftLog = raftLog; + + this.lastAppliedIndex = lastAppliedIndex; + lastSnapshotIndex = lastAppliedIndex; + + autoSnapshotEnabled = properties.getBoolean( + RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, + RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT); + snapshotThreshold = properties.getLong( + RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY, + RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT); + updater = new Daemon(this); + } + + void start() { + updater.start(); + } + + void stop() { + state = State.STOP; + updater.interrupt(); + try { + stateMachine.close(); + } catch (IOException ignored) { + } + } + + void reloadStateMachine() { + state = State.RELOAD; + notifyUpdater(); + } + + synchronized void notifyUpdater() { + notifyAll(); + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "-" + raftLog.getSelfId(); + } + + @Override + public void run() { + final RaftStorage storage = server.getState().getStorage(); + while (isRunning()) { + try { + synchronized (this) { + // when the peers just start, the committedIndex is initialized as 0 + // and will be updated only after the leader contacts other peers. + // Thus initially lastAppliedIndex can be greater than lastCommitted. + while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) { + wait(); + } + } + + final long committedIndex = raftLog.getLastCommittedIndex(); + Preconditions.checkState(lastAppliedIndex < committedIndex); + + if (state == State.RELOAD) { + Preconditions.checkState(stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED); + + stateMachine.reinitialize(server.getId(), properties, storage); + + SnapshotInfo snapshot = stateMachine.getLatestSnapshot(); + Preconditions.checkState(snapshot != null && snapshot.getIndex() > lastAppliedIndex, + "Snapshot: %s, lastAppliedIndex: %s", snapshot, lastAppliedIndex); + + lastAppliedIndex = snapshot.getIndex(); + lastSnapshotIndex = snapshot.getIndex(); + state = State.RUNNING; + } + + while (lastAppliedIndex < committedIndex) { + final LogEntryProto next = raftLog.get(lastAppliedIndex + 1); + if (next != null) { + if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { + // the reply should have already been set. only need to record + // the new conf in the state machine. + stateMachine.setRaftConfiguration( + ServerProtoUtils.toRaftConfiguration(next.getIndex(), + next.getConfigurationEntry())); + } else if (next.getLogEntryBodyCase() == SMLOGENTRY) { + // check whether there is a TransactionContext because we are the leader. + TransactionContext trx = server.getTransactionContext(next.getIndex()); + if (trx == null) { + trx = new TransactionContext(stateMachine, next); + } + + // Let the StateMachine inject logic for committed transactions in sequential order. + trx = stateMachine.applyTransactionSerial(trx); + + // TODO: This step can be parallelized + CompletableFuture<Message> messageFuture = + stateMachine.applyTransaction(trx); + server.replyPendingRequest(next.getIndex(), messageFuture); + } + lastAppliedIndex++; + } else { + LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}", + this, lastAppliedIndex + 1, state); + break; + } + } + + // check if need to trigger a snapshot + if (shouldTakeSnapshot(lastAppliedIndex)) { + stateMachine.takeSnapshot(); + // TODO purge logs, including log cache. but should keep log for leader's RPCSenders + lastSnapshotIndex = lastAppliedIndex; + } + } catch (InterruptedException e) { + if (!isRunning()) { + LOG.info("{}: the StateMachineUpdater is interrupted and will exit.", this); + } else { + final String s = this + ": the StateMachineUpdater is wrongly interrupted"; + ExitUtils.terminate(1, s, e, LOG); + } + } catch (Throwable t) { + final String s = this + ": the StateMachineUpdater hits Throwable"; + ExitUtils.terminate(2, s, t, LOG); + } + } + } + + private boolean isRunning() { + return state != State.STOP; + } + + private boolean shouldTakeSnapshot(long currentAppliedIndex) { + return autoSnapshotEnabled && (state != State.RELOAD) && + (currentAppliedIndex - lastSnapshotIndex >= snapshotThreshold); + } + + long getLastAppliedIndex() { + return lastAppliedIndex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java index e058ea3..95597b2 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java @@ -29,7 +29,7 @@ import java.io.EOFException; import java.io.File; import java.io.IOException; -import static org.apache.raft.server.RaftServerConstants.INVALID_LOG_INDEX; +import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; public class LogInputStream implements Closeable { static final Logger LOG = LoggerFactory.getLogger(LogInputStream.class); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java index 1ecb5d5..0dc8029 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java @@ -18,7 +18,7 @@ package org.apache.raft.server.storage; import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; import org.apache.raft.util.PureJavaCrc32C; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java index 4e37e2a..9523cac 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java @@ -20,7 +20,7 @@ package org.apache.raft.server.storage; import com.google.common.base.Preconditions; import org.apache.commons.io.Charsets; import org.apache.raft.protocol.ChecksumException; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.shaded.com.google.protobuf.CodedInputStream; import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java index b2531f0..987cc6c 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java @@ -18,7 +18,7 @@ package org.apache.raft.server.storage; import com.google.common.base.Preconditions; -import org.apache.raft.server.ConfigurationManager; +import org.apache.raft.server.impl.ConfigurationManager; import org.apache.raft.server.impl.ServerProtoUtils; import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java index d87b0a4..c12e1aa 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java @@ -18,8 +18,8 @@ package org.apache.raft.server.storage; import com.google.common.base.Preconditions; -import org.apache.raft.server.RaftConfiguration; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.RaftConfiguration; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.impl.ServerProtoUtils; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; import org.apache.raft.util.AutoCloseableLock; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java index 42f1391..acd44b0 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java @@ -18,9 +18,9 @@ package org.apache.raft.server.storage; import com.google.common.base.Preconditions; -import org.apache.raft.server.ConfigurationManager; -import org.apache.raft.server.RaftConfiguration; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.ConfigurationManager; +import org.apache.raft.server.impl.RaftConfiguration; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.impl.ServerProtoUtils; import org.apache.raft.server.protocol.TermIndex; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java index bdc0675..d022a91 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java @@ -19,14 +19,14 @@ package org.apache.raft.server.storage; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.storage.LogSegment.LogRecord; import org.apache.raft.server.storage.LogSegment.SegmentFileInfo; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; import java.util.*; -import static org.apache.raft.server.RaftServerConstants.INVALID_LOG_INDEX; +import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; /** * In-memory RaftLog Cache. Currently we provide a simple implementation that http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java index 090be49..1837e94 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java @@ -20,8 +20,8 @@ package org.apache.raft.server.storage; import com.google.common.base.Preconditions; import org.apache.raft.conf.RaftProperties; import org.apache.raft.io.nativeio.NativeIO; -import org.apache.raft.server.RaftServer; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.storage.LogSegment.SegmentFileInfo; import org.apache.raft.server.storage.RaftLogCache.TruncationSegments; import org.apache.raft.server.storage.SegmentedRaftLog.Task; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java index 8646b9a..434f505 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java @@ -19,7 +19,7 @@ package org.apache.raft.server.storage; import com.google.common.base.Preconditions; import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.storage.RaftStorageDirectory.StorageState; import org.apache.raft.statemachine.SnapshotInfo; import org.apache.raft.statemachine.StateMachineStorage; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java index e47f3a6..662e4ec 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java @@ -40,7 +40,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import static java.nio.file.Files.newDirectoryStream; -import static org.apache.raft.server.RaftServerConstants.INVALID_LOG_INDEX; +import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; public class RaftStorageDirectory { static final Logger LOG = LoggerFactory.getLogger(RaftStorageDirectory.class); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java index d87fc0a..9c55491 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java @@ -21,9 +21,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.io.Charsets; import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.ConfigurationManager; -import org.apache.raft.server.RaftServer; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.ConfigurationManager; +import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; import org.apache.raft.util.AutoCloseableLock; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java b/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java index 33fff3f..ccc52c7 100644 --- a/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java +++ b/raft-server/src/main/java/org/apache/raft/statemachine/BaseStateMachine.java @@ -22,8 +22,8 @@ import org.apache.raft.conf.RaftProperties; import org.apache.raft.protocol.Message; import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.server.RaftConfiguration; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.RaftConfiguration; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.storage.RaftStorage; import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.raft.util.LifeCycle; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java b/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java index ad9fee3..bedb5b0 100644 --- a/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java +++ b/raft-server/src/main/java/org/apache/raft/statemachine/SimpleStateMachineStorage.java @@ -21,7 +21,7 @@ package org.apache.raft.statemachine; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.raft.io.MD5Hash; -import org.apache.raft.server.RaftConfiguration; +import org.apache.raft.server.impl.RaftConfiguration; import org.apache.raft.server.protocol.TermIndex; import org.apache.raft.server.storage.FileInfo; import org.apache.raft.server.storage.RaftStorage;
