http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/ServerState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/ServerState.java b/raft-server/src/main/java/org/apache/raft/server/ServerState.java deleted file mode 100644 index 8975eb3..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/ServerState.java +++ /dev/null @@ -1,346 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.impl.ServerProtoUtils; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.*; -import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.ProtoUtils; - -import java.io.Closeable; -import java.io.IOException; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_DEFAULT; -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY; - -/** - * Common states of a raft peer. Protected by RaftServer's lock. - */ -public class ServerState implements Closeable { - private final String selfId; - private final RaftServer server; - /** Raft log */ - private final RaftLog log; - /** Raft configuration */ - private final ConfigurationManager configurationManager; - /** The thread that applies committed log entries to the state machine */ - private final StateMachineUpdater stateMachineUpdater; - /** local storage for log and snapshot */ - private final RaftStorage storage; - private final SnapshotManager snapshotManager; - - /** - * Latest term server has seen. initialized to 0 on first boot, increases - * monotonically. - */ - private long currentTerm; - /** - * The server ID of the leader for this term. Null means either there is - * no leader for this term yet or this server does not know who it is yet. - */ - private String leaderId; - /** - * Candidate that this peer granted vote for in current term (or null if none) - */ - private String votedFor; - - /** - * Latest installed snapshot for this server. This maybe different than StateMachine's latest - * snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately. - * Further, this will not get updated when SM does snapshots itself. - */ - private TermIndex latestInstalledSnapshot; - - ServerState(String id, RaftConfiguration conf, RaftProperties prop, - RaftServer server, StateMachine stateMachine) throws IOException { - this.selfId = id; - this.server = server; - configurationManager = new ConfigurationManager(conf); - storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR); - snapshotManager = new SnapshotManager(storage, id); - - long lastApplied = initStatemachine(stateMachine, prop); - - leaderId = null; - log = initLog(id, prop, server, lastApplied); - RaftLog.Metadata metadata = log.loadMetadata(); - currentTerm = metadata.getTerm(); - votedFor = metadata.getVotedFor(); - - stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log, - lastApplied, prop); - } - - /** - * Used by tests to set initial raft configuration with correct port bindings. - */ - @VisibleForTesting - public void setInitialConf(RaftConfiguration initialConf) { - configurationManager.setInitialConf(initialConf); - } - - private long initStatemachine(StateMachine sm, RaftProperties properties) - throws IOException { - sm.initialize(selfId, properties, storage); - storage.setStateMachineStorage(sm.getStateMachineStorage()); - SnapshotInfo snapshot = sm.getLatestSnapshot(); - - if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) { - return RaftServerConstants.INVALID_LOG_INDEX; - } - - // get the raft configuration from the snapshot - RaftConfiguration raftConf = sm.getRaftConfiguration(); - if (raftConf != null) { - configurationManager.addConfiguration(raftConf.getLogEntryIndex(), - raftConf); - } - return snapshot.getIndex(); - } - - void start() { - stateMachineUpdater.start(); - } - - /** - * note we do not apply log entries to the state machine here since we do not - * know whether they have been committed. - */ - private RaftLog initLog(String id, RaftProperties prop, RaftServer server, - long lastIndexInSnapshot) throws IOException { - final RaftLog log; - if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY, - RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) { - log = new MemoryRaftLog(id); - } else { - log = new SegmentedRaftLog(id, server, this.storage, - lastIndexInSnapshot, prop); - } - log.open(configurationManager, lastIndexInSnapshot); - return log; - } - - public RaftConfiguration getRaftConf() { - return configurationManager.getCurrent(); - } - - @VisibleForTesting - - public String getSelfId() { - return this.selfId; - } - - public long getCurrentTerm() { - return currentTerm; - } - - void setCurrentTerm(long term) { - currentTerm = term; - } - - String getLeaderId() { - return leaderId; - } - - boolean hasLeader() { - return leaderId != null; - } - - /** - * Become a candidate and start leader election - */ - long initElection() { - votedFor = selfId; - leaderId = null; - return ++currentTerm; - } - - void persistMetadata() throws IOException { - this.log.writeMetadata(currentTerm, votedFor); - } - - void resetLeaderAndVotedFor() { - votedFor = null; - leaderId = null; - } - - /** - * Vote for a candidate and update the local state. - */ - void grantVote(String candidateId) { - votedFor = candidateId; - leaderId = null; - } - - void setLeader(String leaderId) { - this.leaderId = leaderId; - } - - void becomeLeader() { - leaderId = selfId; - } - - public RaftLog getLog() { - return log; - } - - long applyLog(TransactionContext operation) throws IOException { - return log.append(currentTerm, operation); - } - - /** - * Check if accept the leader selfId and term from the incoming AppendEntries rpc. - * If accept, update the current state. - * @return true if the check passes - */ - boolean recognizeLeader(String leaderId, long leaderTerm) { - if (leaderTerm < currentTerm) { - return false; - } else if (leaderTerm > currentTerm || this.leaderId == null) { - // If the request indicates a term that is greater than the current term - // or no leader has been set for the current term, make sure to update - // leader and term later - return true; - } - Preconditions.checkArgument(this.leaderId.equals(leaderId), - "selfId:%s, this.leaderId:%s, received leaderId:%s", - selfId, this.leaderId, leaderId); - return true; - } - - /** - * Check if the candidate's term is acceptable - */ - boolean recognizeCandidate(String candidateId, - long candidateTerm) { - if (candidateTerm > currentTerm) { - return true; - } else if (candidateTerm == currentTerm) { - // has not voted yet or this is a retry - return votedFor == null || votedFor.equals(candidateId); - } - return false; - } - - boolean isLogUpToDate(TermIndex candidateLastEntry) { - LogEntryProto lastEntry = log.getLastEntry(); - // need to take into account snapshot - SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot(); - if (lastEntry == null && snapshot == null) { - return true; - } else if (candidateLastEntry == null) { - return false; - } - TermIndex local = ServerProtoUtils.toTermIndex(lastEntry); - if (local == null || (snapshot != null && snapshot.getIndex() > lastEntry.getIndex())) { - local = snapshot.getTermIndex(); - } - return local.compareTo(candidateLastEntry) <= 0; - } - - @Override - public String toString() { - return selfId + ":t" + currentTerm + ", leader=" + leaderId - + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + getRaftConf(); - } - - boolean isConfCommitted() { - return getLog().getLastCommittedIndex() >= - getRaftConf().getLogEntryIndex(); - } - - public void setRaftConf(long logIndex, RaftConfiguration conf) { - configurationManager.addConfiguration(logIndex, conf); - RaftServer.LOG.info("{}: successfully update the configuration {}", - getSelfId(), conf); - } - - void updateConfiguration(LogEntryProto[] entries) { - if (entries != null && entries.length > 0) { - configurationManager.removeConfigurations(entries[0].getIndex()); - for (LogEntryProto entry : entries) { - if (ProtoUtils.isConfigurationLogEntry(entry)) { - final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration( - entry.getIndex(), entry.getConfigurationEntry()); - configurationManager.addConfiguration(entry.getIndex(), conf); - server.addPeersToRPC(conf.getPeers()); - } - } - } - } - - void updateStatemachine(long majorityIndex, long currentTerm) { - log.updateLastCommitted(majorityIndex, currentTerm); - stateMachineUpdater.notifyUpdater(); - } - - void reloadStateMachine(long lastIndexInSnapshot, long currentTerm) - throws IOException { - log.updateLastCommitted(lastIndexInSnapshot, currentTerm); - - stateMachineUpdater.reloadStateMachine(); - } - - @Override - public void close() throws IOException { - stateMachineUpdater.stop(); - RaftServer.LOG.info("{} closes. The last applied log index is {}", - getSelfId(), getLastAppliedIndex()); - storage.close(); - } - - @VisibleForTesting - public RaftStorage getStorage() { - return storage; - } - - void installSnapshot(InstallSnapshotRequestProto request) throws IOException { - // TODO: verify that we need to install the snapshot - StateMachine sm = server.getStateMachine(); - sm.pause(); // pause the SM to prepare for install snapshot - snapshotManager.installSnapshot(sm, request); - log.syncWithSnapshot(request.getTermIndex().getIndex()); - this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex( - request.getTermIndex()); - } - - SnapshotInfo getLatestSnapshot() { - return server.getStateMachine().getStateMachineStorage().getLatestSnapshot(); - } - - public TermIndex getLatestInstalledSnapshot() { - return latestInstalledSnapshot; - } - - @VisibleForTesting - public long getLastAppliedIndex() { - return stateMachineUpdater.getLastAppliedIndex(); - } - - boolean isCurrentConfCommitted() { - return getRaftConf().getLogEntryIndex() <= getLog().getLastCommittedIndex(); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/StateMachineUpdater.java b/raft-server/src/main/java/org/apache/raft/server/StateMachineUpdater.java deleted file mode 100644 index b6f88be..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/StateMachineUpdater.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.Message; -import org.apache.raft.server.impl.ServerProtoUtils; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.server.storage.RaftStorage; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.ExitUtils; -import org.apache.raft.util.LifeCycle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; - -import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; -import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; - -/** - * This class tracks the log entries that have been committed in a quorum and - * applies them to the state machine. We let a separate thread do this work - * asynchronously so that this will not block normal raft protocol. - * - * If the auto log compaction is enabled, the state machine updater thread will - * trigger a snapshot of the state machine by calling - * {@link StateMachine#takeSnapshot} when the log size exceeds a limit. - */ -class StateMachineUpdater implements Runnable { - static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class); - - enum State { - RUNNING, STOP, RELOAD - } - - private final RaftProperties properties; - private final StateMachine stateMachine; - private final RaftServer server; - private final RaftLog raftLog; - - private volatile long lastAppliedIndex; - - private final boolean autoSnapshotEnabled; - private final long snapshotThreshold; - private long lastSnapshotIndex; - - private final Thread updater; - private volatile State state = State.RUNNING; - - StateMachineUpdater(StateMachine stateMachine, RaftServer server, - RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) { - this.properties = properties; - this.stateMachine = stateMachine; - this.server = server; - this.raftLog = raftLog; - - this.lastAppliedIndex = lastAppliedIndex; - lastSnapshotIndex = lastAppliedIndex; - - autoSnapshotEnabled = properties.getBoolean( - RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, - RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT); - snapshotThreshold = properties.getLong( - RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY, - RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT); - updater = new Daemon(this); - } - - void start() { - updater.start(); - } - - void stop() { - state = State.STOP; - updater.interrupt(); - try { - stateMachine.close(); - } catch (IOException ignored) { - } - } - - void reloadStateMachine() { - state = State.RELOAD; - notifyUpdater(); - } - - synchronized void notifyUpdater() { - notifyAll(); - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "-" + raftLog.getSelfId(); - } - - @Override - public void run() { - final RaftStorage storage = server.getState().getStorage(); - while (isRunning()) { - try { - synchronized (this) { - // when the peers just start, the committedIndex is initialized as 0 - // and will be updated only after the leader contacts other peers. - // Thus initially lastAppliedIndex can be greater than lastCommitted. - while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) { - wait(); - } - } - - final long committedIndex = raftLog.getLastCommittedIndex(); - Preconditions.checkState(lastAppliedIndex < committedIndex); - - if (state == State.RELOAD) { - Preconditions.checkState(stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED); - - stateMachine.reinitialize(server.getId(), properties, storage); - - SnapshotInfo snapshot = stateMachine.getLatestSnapshot(); - Preconditions.checkState(snapshot != null && snapshot.getIndex() > lastAppliedIndex, - "Snapshot: %s, lastAppliedIndex: %s", snapshot, lastAppliedIndex); - - lastAppliedIndex = snapshot.getIndex(); - lastSnapshotIndex = snapshot.getIndex(); - state = State.RUNNING; - } - - while (lastAppliedIndex < committedIndex) { - final LogEntryProto next = raftLog.get(lastAppliedIndex + 1); - if (next != null) { - if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { - // the reply should have already been set. only need to record - // the new conf in the state machine. - stateMachine.setRaftConfiguration( - ServerProtoUtils.toRaftConfiguration(next.getIndex(), - next.getConfigurationEntry())); - } else if (next.getLogEntryBodyCase() == SMLOGENTRY) { - // check whether there is a TransactionContext because we are the leader. - TransactionContext trx = server.getTransactionContext(next.getIndex()); - if (trx == null) { - trx = new TransactionContext(stateMachine, next); - } - - // Let the StateMachine inject logic for committed transactions in sequential order. - trx = stateMachine.applyTransactionSerial(trx); - - // TODO: This step can be parallelized - CompletableFuture<Message> messageFuture = - stateMachine.applyTransaction(trx); - server.replyPendingRequest(next.getIndex(), messageFuture); - } - lastAppliedIndex++; - } else { - LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}", - this, lastAppliedIndex + 1, state); - break; - } - } - - // check if need to trigger a snapshot - if (shouldTakeSnapshot(lastAppliedIndex)) { - stateMachine.takeSnapshot(); - // TODO purge logs, including log cache. but should keep log for leader's RPCSenders - lastSnapshotIndex = lastAppliedIndex; - } - } catch (InterruptedException e) { - if (!isRunning()) { - LOG.info("{}: the StateMachineUpdater is interrupted and will exit.", this); - } else { - final String s = this + ": the StateMachineUpdater is wrongly interrupted"; - ExitUtils.terminate(1, s, e, LOG); - } - } catch (Throwable t) { - final String s = this + ": the StateMachineUpdater hits Throwable"; - ExitUtils.terminate(2, s, t, LOG); - } - } - } - - private boolean isRunning() { - return state != State.STOP; - } - - private boolean shouldTakeSnapshot(long currentAppliedIndex) { - return autoSnapshotEnabled && (state != State.RELOAD) && - (currentAppliedIndex - lastSnapshotIndex >= snapshotThreshold); - } - - long getLastAppliedIndex() { - return lastAppliedIndex; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/ConfigurationManager.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ConfigurationManager.java b/raft-server/src/main/java/org/apache/raft/server/impl/ConfigurationManager.java new file mode 100644 index 0000000..b2f077d --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/ConfigurationManager.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import java.util.*; + +/** + * Maintain the mappings between log index and corresponding raft configuration. + * Initialized when starting the raft peer. The mappings are loaded from the + * raft log, and updated while appending/truncating configuration related log + * entries. + */ +public class ConfigurationManager { + private RaftConfiguration initialConf; + private final NavigableMap<Long, RaftConfiguration> configurations = + new TreeMap<>(); + /** + * The current raft configuration. If configurations is not empty, should be + * the last entry of the map. Otherwise is initialConf. + */ + private RaftConfiguration currentConf; + + public ConfigurationManager(RaftConfiguration initialConf) { + setInitialConf(initialConf); + } + + @VisibleForTesting + public synchronized void setInitialConf(RaftConfiguration initialConf) { + /** + * initialConf should actually be defined as "final". But for tests we want + * to change the initial configuration to reflect the correct port binding. + */ + this.initialConf = initialConf; + this.currentConf = initialConf; + } + + public synchronized void addConfiguration(long logIndex, + RaftConfiguration conf) { + Preconditions.checkArgument(configurations.isEmpty() || + configurations.lastEntry().getKey() < logIndex); + configurations.put(logIndex, conf); + this.currentConf = conf; + } + + synchronized RaftConfiguration getCurrent() { + return currentConf; + } + + /** + * Remove all the configurations whose log index is >= the given index. + * @param index The given index. All the configurations whose log index is >= + * this value will be removed. + * @return The configuration with largest log index < the given index. + */ + synchronized RaftConfiguration removeConfigurations(long index) { + SortedMap<Long, RaftConfiguration> toRemove = configurations.tailMap(index); + for (Iterator<Map.Entry<Long, RaftConfiguration>> iter = + toRemove.entrySet().iterator(); iter.hasNext();) { + iter.next(); + iter.remove(); + } + currentConf = configurations.isEmpty() ? initialConf : + configurations.lastEntry().getValue(); + return currentConf; + } + + @VisibleForTesting + synchronized int numOfConf() { + return 1 + configurations.size(); + } + + // TODO: remove Configuration entries after they are committed +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/FollowerInfo.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/FollowerInfo.java b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerInfo.java new file mode 100644 index 0000000..683599e --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerInfo.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import org.apache.raft.protocol.RaftPeer; +import org.apache.raft.util.Timestamp; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +public class FollowerInfo { + private final RaftPeer peer; + private final AtomicReference<Timestamp> lastRpcResponseTime; + private final AtomicReference<Timestamp> lastRpcSendTime; + private long nextIndex; + private final AtomicLong matchIndex; + private volatile boolean attendVote; + + FollowerInfo(RaftPeer peer, Timestamp lastRpcTime, long nextIndex, + boolean attendVote) { + this.peer = peer; + this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime); + this.lastRpcSendTime = new AtomicReference<>(lastRpcTime); + this.nextIndex = nextIndex; + this.matchIndex = new AtomicLong(0); + this.attendVote = attendVote; + } + + public void updateMatchIndex(final long matchIndex) { + this.matchIndex.set(matchIndex); + } + + long getMatchIndex() { + return matchIndex.get(); + } + + public synchronized long getNextIndex() { + return nextIndex; + } + + public synchronized void updateNextIndex(long i) { + nextIndex = i; + } + + public synchronized void decreaseNextIndex(long targetIndex) { + if (nextIndex > 0) { + nextIndex = Math.min(nextIndex - 1, targetIndex); + } + } + + @Override + public String toString() { + return peer.getId() + "(next=" + nextIndex + ", match=" + matchIndex + "," + + " attendVote=" + attendVote + + ", lastRpcSendTime=" + lastRpcSendTime + + ", lastRpcResponseTime=" + lastRpcResponseTime + ")"; + } + + void startAttendVote() { + attendVote = true; + } + + public boolean isAttendingVote() { + return attendVote; + } + + public RaftPeer getPeer() { + return peer; + } + + /** Update lastRpcResponseTime to the current time. */ + public void updateLastRpcResponseTime() { + lastRpcResponseTime.set(new Timestamp()); + } + + public Timestamp getLastRpcResponseTime() { + return lastRpcResponseTime.get(); + } + + /** Update lastRpcSendTime to the current time. */ + public void updateLastRpcSendTime() { + lastRpcSendTime.set(new Timestamp()); + } + + public Timestamp getLastRpcTime() { + return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java new file mode 100644 index 0000000..61b3c92 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import org.apache.raft.util.Daemon; +import org.apache.raft.util.Timestamp; +import org.slf4j.Logger; + +/** + * Used when the peer is a follower. Used to track the election timeout. + */ +class FollowerState extends Daemon { + static final Logger LOG = RaftServer.LOG; + + private final RaftServer server; + + private volatile Timestamp lastRpcTime = new Timestamp(); + private volatile boolean monitorRunning = true; + private volatile boolean inLogSync = false; + + FollowerState(RaftServer server) { + this.server = server; + } + + void updateLastRpcTime(boolean inLogSync) { + lastRpcTime = new Timestamp(); + LOG.trace("{} update last rpc time to {}", server.getId(), lastRpcTime); + this.inLogSync = inLogSync; + } + + Timestamp getLastRpcTime() { + return lastRpcTime; + } + + boolean shouldWithholdVotes() { + return lastRpcTime.elapsedTimeMs() < server.getMinTimeoutMs(); + } + + void stopRunning() { + this.monitorRunning = false; + } + + @Override + public void run() { + while (monitorRunning && server.isFollower()) { + final long electionTimeout = server.getRandomTimeoutMs(); + try { + Thread.sleep(electionTimeout); + if (!monitorRunning || !server.isFollower()) { + LOG.info("{} heartbeat monitor quit", server.getId()); + break; + } + synchronized (server) { + if (!inLogSync && lastRpcTime.elapsedTimeMs() >= electionTimeout) { + LOG.info("{} changes to {}, lastRpcTime:{}, electionTimeout:{}", + server.getId(), Role.CANDIDATE, lastRpcTime, electionTimeout); + // election timeout, should become a candidate + server.changeToCandidate(); + break; + } + } + } catch (InterruptedException e) { + LOG.info(this + " was interrupted: " + e); + LOG.trace("TRACE", e); + return; + } catch (Exception e) { + LOG.warn(this + " caught an exception", e); + } + } + } + + @Override + public String toString() { + return server.getId() + ": " + getClass().getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java new file mode 100644 index 0000000..10e901f --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.raft.protocol.RaftPeer; +import org.apache.raft.server.protocol.TermIndex; +import org.apache.raft.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.raft.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.raft.statemachine.SnapshotInfo; +import org.apache.raft.util.Daemon; +import org.apache.raft.util.ProtoUtils; +import org.apache.raft.util.Timestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +class LeaderElection extends Daemon { + public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class); + + private ResultAndTerm logAndReturn(Result result, + List<RequestVoteReplyProto> responses, + List<Exception> exceptions, long newTerm) { + LOG.info(server.getId() + ": Election " + result + "; received " + + responses.size() + " response(s) " + + responses.stream().map(r -> ProtoUtils.toString(r)).collect(Collectors.toList()) + + " and " + exceptions.size() + " exception(s); " + server.getState()); + int i = 0; + for(Exception e : exceptions) { + LOG.info(" " + i++ + ": " + e); + LOG.trace("TRACE", e); + } + return new ResultAndTerm(result, newTerm); + } + + enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN} + + private static class ResultAndTerm { + final Result result; + final long term; + + ResultAndTerm(Result result, long term) { + this.result = result; + this.term = term; + } + } + + private final RaftServer server; + private ExecutorCompletionService<RequestVoteReplyProto> service; + private ExecutorService executor; + private volatile boolean running; + /** + * The Raft configuration should not change while the peer is in candidate + * state. If the configuration changes, another peer should be acting as a + * leader and this LeaderElection session should end. + */ + private final RaftConfiguration conf; + private final Collection<RaftPeer> others; + + LeaderElection(RaftServer server) { + this.server = server; + conf = server.getRaftConf(); + others = conf.getOtherPeers(server.getId()); + this.running = true; + } + + void stopRunning() { + this.running = false; + } + + private void initExecutor() { + Preconditions.checkState(!others.isEmpty()); + executor = Executors.newFixedThreadPool(others.size(), + new ThreadFactoryBuilder().setDaemon(true).build()); + service = new ExecutorCompletionService<>(executor); + } + + @Override + public void run() { + try { + askForVotes(); + } catch (InterruptedException e) { + // the leader election thread is interrupted. The peer may already step + // down to a follower. The leader election should skip. + LOG.info("The leader election thread of peer {} is interrupted. " + + "Currently role: {}.", server.getId(), server.getRole()); + } catch (IOException e) { + LOG.warn("Failed to persist votedFor/term. Exit the leader election.", e); + stopRunning(); + } + } + + /** + * After a peer changes its role to candidate, it invokes this method to + * send out requestVote rpc to all other peers. + */ + private void askForVotes() throws InterruptedException, IOException { + final ServerState state = server.getState(); + while (running && server.isCandidate()) { + // one round of requestVotes + final long electionTerm; + synchronized (server) { + electionTerm = state.initElection(); + server.getState().persistMetadata(); + } + LOG.info(state.getSelfId() + ": begin an election in Term " + + electionTerm); + + TermIndex lastEntry = ServerProtoUtils.toTermIndex( + state.getLog().getLastEntry()); + if (lastEntry == null) { + // lastEntry may need to be derived from snapshot + SnapshotInfo snapshot = state.getLatestSnapshot(); + if (snapshot != null) { + lastEntry = snapshot.getTermIndex(); + } + } + + final ResultAndTerm r; + if (others.isEmpty()) { + r = new ResultAndTerm(Result.PASSED, electionTerm); + } else { + try { + initExecutor(); + int submitted = submitRequests(electionTerm, lastEntry); + r = waitForResults(electionTerm, submitted); + } finally { + if (executor != null) { + executor.shutdown(); + } + } + } + + synchronized (server) { + if (electionTerm != state.getCurrentTerm() || !running || + !server.isCandidate()) { + return; // term already passed or no longer a candidate. + } + + switch (r.result) { + case PASSED: + server.changeToLeader(); + return; + case SHUTDOWN: + LOG.info("{} received shutdown response when requesting votes.", + server.getId()); + server.close(); + return; + case REJECTED: + case DISCOVERED_A_NEW_TERM: + final long term = r.term > server.getState().getCurrentTerm() ? + r.term : server.getState().getCurrentTerm(); + server.changeToFollower(term, true); + return; + case TIMEOUT: + // should start another election + } + } + } + } + + private int submitRequests(final long electionTerm, final TermIndex lastEntry) { + int submitted = 0; + for (final RaftPeer peer : others) { + final RequestVoteRequestProto r = server.createRequestVoteRequest( + peer.getId(), electionTerm, lastEntry); + service.submit( + () -> server.getServerRpc().sendRequestVote(r)); + submitted++; + } + return submitted; + } + + private ResultAndTerm waitForResults(final long electionTerm, + final int submitted) throws InterruptedException { + final Timestamp timeout = new Timestamp().addTimeMs(server.getRandomTimeoutMs()); + final List<RequestVoteReplyProto> responses = new ArrayList<>(); + final List<Exception> exceptions = new ArrayList<>(); + int waitForNum = submitted; + Collection<String> votedPeers = new ArrayList<>(); + while (waitForNum > 0 && running && server.isCandidate()) { + final long waitTime = -timeout.elapsedTimeMs(); + if (waitTime <= 0) { + return logAndReturn(Result.TIMEOUT, responses, exceptions, -1); + } + + try { + final Future<RequestVoteReplyProto> future = service.poll( + waitTime, TimeUnit.MILLISECONDS); + if (future == null) { + continue; // poll timeout, continue to return Result.TIMEOUT + } + + final RequestVoteReplyProto r = future.get(); + responses.add(r); + if (r.getShouldShutdown()) { + return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1); + } + if (r.getTerm() > electionTerm) { + return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses, + exceptions, r.getTerm()); + } + if (r.getServerReply().getSuccess()) { + votedPeers.add(r.getServerReply().getReplyId()); + if (conf.hasMajority(votedPeers, server.getId())) { + return logAndReturn(Result.PASSED, responses, exceptions, -1); + } + } + } catch(ExecutionException e) { + LOG.info("Got exception when requesting votes: " + e); + LOG.trace("TRACE", e); + exceptions.add(e); + } + waitForNum--; + } + // received all the responses + return logAndReturn(Result.REJECTED, responses, exceptions, -1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java new file mode 100644 index 0000000..79d0d9e --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java @@ -0,0 +1,581 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.base.Preconditions; +import org.apache.raft.conf.RaftProperties; +import org.apache.raft.protocol.*; +import org.apache.raft.server.storage.RaftLog; +import org.apache.raft.shaded.proto.RaftProtos.LeaderNoOp; +import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.raft.statemachine.TransactionContext; +import org.apache.raft.util.Daemon; +import org.apache.raft.util.ProtoUtils; +import org.apache.raft.util.Timestamp; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.raft.server.RaftServerConfigKeys.*; +import static org.apache.raft.server.impl.LeaderState.StateUpdateEventType.*; + +/** + * States for leader only. It contains three different types of processors: + * 1. RPC senders: each thread is appending log to a follower + * 2. EventProcessor: a single thread updating the raft server's state based on + * status of log appending response + * 3. PendingRequestHandler: a handler sending back responses to clients when + * corresponding log entries are committed + */ +public class LeaderState { + private static final Logger LOG = RaftServer.LOG; + + enum StateUpdateEventType { + STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS + } + + enum BootStrapProgress { + NOPROGRESS, PROGRESSING, CAUGHTUP + } + + static class StateUpdateEvent { + final StateUpdateEventType type; + final long newTerm; + + StateUpdateEvent(StateUpdateEventType type, long newTerm) { + this.type = type; + this.newTerm = newTerm; + } + } + + static final StateUpdateEvent UPDATE_COMMIT_EVENT = + new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1); + static final StateUpdateEvent STAGING_PROGRESS_EVENT = + new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1); + + private final RaftServer server; + private final RaftLog raftLog; + private final long currentTerm; + private volatile ConfigurationStagingState stagingState; + private List<List<FollowerInfo>> voterLists; + + /** + * The list of threads appending entries to followers. + * The list is protected by the RaftServer's lock. + */ + private final List<LogAppender> senders; + private final BlockingQueue<StateUpdateEvent> eventQ; + private final EventProcessor processor; + private final PendingRequests pendingRequests; + private volatile boolean running = true; + + private final int stagingCatchupGap; + private final int snapshotChunkMaxSize; + private final int syncInterval; + + LeaderState(RaftServer server, RaftProperties properties) { + this.server = server; + + stagingCatchupGap = properties.getInt( + RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, + RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT); + snapshotChunkMaxSize = properties.getInt( + RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY, + RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT); + syncInterval = properties.getInt( + RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY, + RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT); + + final ServerState state = server.getState(); + this.raftLog = state.getLog(); + this.currentTerm = state.getCurrentTerm(); + eventQ = new ArrayBlockingQueue<>(4096); + processor = new EventProcessor(); + pendingRequests = new PendingRequests(server); + + final RaftConfiguration conf = server.getRaftConf(); + Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId()); + final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); + final long nextIndex = raftLog.getNextIndex(); + senders = new ArrayList<>(others.size()); + for (RaftPeer p : others) { + FollowerInfo f = new FollowerInfo(p, t, nextIndex, true); + senders.add(server.getLogAppenderFactory().getLogAppender(server, this, f)); + } + voterLists = divideFollowers(conf); + } + + void start() { + // In the beginning of the new term, replicate an empty entry in order + // to finally commit entries in the previous term. + // Also this message can help identify the last committed index when + // the leader peer is just started. + final LogEntryProto placeHolder = LogEntryProto.newBuilder() + .setTerm(server.getState().getCurrentTerm()) + .setIndex(raftLog.getNextIndex()) + .setNoOp(LeaderNoOp.newBuilder()).build(); + raftLog.append(placeHolder); + + processor.start(); + startSenders(); + } + + private void startSenders() { + senders.forEach(Thread::start); + } + + void stop() { + this.running = false; + // do not interrupt event processor since it may be in the middle of logSync + for (LogAppender sender : senders) { + sender.stopSender(); + sender.interrupt(); + } + try { + pendingRequests.sendNotLeaderResponses(); + } catch (IOException e) { + LOG.warn("Caught exception in sendNotLeaderResponses", e); + } + } + + void notifySenders() { + senders.forEach(LogAppender::notifyAppend); + } + + boolean inStagingState() { + return stagingState != null; + } + + ConfigurationStagingState getStagingState() { + return stagingState; + } + + long getCurrentTerm() { + return currentTerm; + } + + int getSnapshotChunkMaxSize() { + return snapshotChunkMaxSize; + } + + int getSyncInterval() { + return syncInterval; + } + + /** + * Start bootstrapping new peers + */ + PendingRequest startSetConfiguration(SetConfigurationRequest request) { + Preconditions.checkState(running && !inStagingState()); + + RaftPeer[] peersInNewConf = request.getPeersInNewConf(); + Collection<RaftPeer> peersToBootStrap = RaftConfiguration + .computeNewPeers(peersInNewConf, server.getRaftConf()); + + // add the request to the pending queue + final PendingRequest pending = pendingRequests.addConfRequest(request); + + ConfigurationStagingState stagingState = new ConfigurationStagingState( + peersToBootStrap, new PeerConfiguration(Arrays.asList(peersInNewConf))); + Collection<RaftPeer> newPeers = stagingState.getNewPeers(); + // set the staging state + this.stagingState = stagingState; + + if (newPeers.isEmpty()) { + applyOldNewConf(); + } else { + // update the LeaderState's sender list + addSenders(newPeers); + } + return pending; + } + + PendingRequest addPendingRequest(long index, RaftClientRequest request, + TransactionContext entry) { + return pendingRequests.addPendingRequest(index, request, entry); + } + + private void applyOldNewConf() { + final ServerState state = server.getState(); + final RaftConfiguration current = server.getRaftConf(); + final RaftConfiguration oldNewConf= stagingState.generateOldNewConf(current, + state.getLog().getNextIndex()); + // apply the (old, new) configuration to log, and use it as the current conf + long index = state.getLog().append(state.getCurrentTerm(), oldNewConf); + updateConfiguration(index, oldNewConf); + + this.stagingState = null; + notifySenders(); + } + + private void updateConfiguration(long logIndex, RaftConfiguration newConf) { + voterLists = divideFollowers(newConf); + server.getState().setRaftConf(logIndex, newConf); + } + + /** + * After receiving a setConfiguration request, the leader should update its + * RpcSender list. + */ + void addSenders(Collection<RaftPeer> newMembers) { + final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); + final long nextIndex = raftLog.getNextIndex(); + for (RaftPeer peer : newMembers) { + FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false); + LogAppender sender = server.getLogAppenderFactory() + .getLogAppender(server, this, f); + senders.add(sender); + sender.start(); + } + } + + /** + * Update the RpcSender list based on the current configuration + */ + private void updateSenders(RaftConfiguration conf) { + Preconditions.checkState(conf.isStable() && !inStagingState()); + Iterator<LogAppender> iterator = senders.iterator(); + while (iterator.hasNext()) { + LogAppender sender = iterator.next(); + if (!conf.containsInConf(sender.getFollower().getPeer().getId())) { + iterator.remove(); + sender.stopSender(); + sender.interrupt(); + } + } + } + + void submitUpdateStateEvent(StateUpdateEvent event) { + try { + eventQ.put(event); + } catch (InterruptedException e) { + LOG.info("Interrupted when adding event {} into the queue", event); + } + } + + private void prepare() { + synchronized (server) { + if (running) { + final RaftConfiguration conf = server.getRaftConf(); + if (conf.isTransitional() && server.getState().isConfCommitted()) { + // the configuration is in transitional state, and has been committed + // so it is time to generate and replicate (new) conf. + replicateNewConf(); + } + } + } + } + + /** + * The processor thread takes the responsibility to update the raft server's + * state, such as changing to follower, or updating the committed index. + */ + private class EventProcessor extends Daemon { + @Override + public void run() { + // apply an empty message; check if necessary to replicate (new) conf + prepare(); + + while (running) { + try { + StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(), + TimeUnit.MILLISECONDS); + synchronized (server) { + if (running) { + handleEvent(event); + } + } + // the updated configuration does not need to be sync'ed here + } catch (InterruptedException e) { + if (!running) { + LOG.info("The LeaderState gets is stopped"); + } else { + LOG.warn("The leader election thread of peer {} is interrupted. " + + "Currently role: {}.", server.getId(), server.getRole()); + throw new RuntimeException(e); + } + } catch (IOException e) { + LOG.warn("Failed to persist new votedFor/term.", e); + // the failure should happen while changing the state to follower + // thus the in-memory state should have been updated + Preconditions.checkState(!running); + } + } + } + } + + private void handleEvent(StateUpdateEvent e) throws IOException { + if (e == null) { + if (inStagingState()) { + checkNewPeers(); + } + } else { + if (e.type == STEPDOWN) { + server.changeToFollower(e.newTerm, true); + } else if (e.type == UPDATECOMMIT) { + updateLastCommitted(); + } else if (e.type == STAGINGPROGRESS) { + checkNewPeers(); + } + } + } + + /** + * So far we use a simple implementation for catchup checking: + * 1. If the latest rpc time of the remote peer is before 3 * max_timeout, + * the peer made no progress for that long. We should fail the whole + * setConfiguration request. + * 2. If the peer's matching index is just behind for a small gap, and the + * peer was updated recently (within max_timeout), declare the peer as + * caught-up. + * 3. Otherwise the peer is making progressing. Keep waiting. + */ + private BootStrapProgress checkProgress(FollowerInfo follower, + long committed) { + Preconditions.checkArgument(!follower.isAttendingVote()); + final Timestamp progressTime = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); + final Timestamp timeoutTime = new Timestamp().addTimeMs(-3*server.getMaxTimeoutMs()); + if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) { + LOG.debug("{} detects a follower {} timeout for bootstrapping," + + " timeoutTime: {}", server.getId(), follower, timeoutTime); + return BootStrapProgress.NOPROGRESS; + } else if (follower.getMatchIndex() + stagingCatchupGap > committed + && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) { + return BootStrapProgress.CAUGHTUP; + } else { + return BootStrapProgress.PROGRESSING; + } + } + + private Collection<BootStrapProgress> checkAllProgress(long committed) { + Preconditions.checkState(inStagingState()); + return senders.stream() + .filter(sender -> !sender.getFollower().isAttendingVote()) + .map(sender -> checkProgress(sender.getFollower(), committed)) + .collect(Collectors.toCollection(ArrayList::new)); + } + + private void checkNewPeers() { + if (!inStagingState()) { + // it is possible that the bootstrapping is done and we still have + // remaining STAGINGPROGRESS event to handle. + updateLastCommitted(); + } else { + final long committedIndex = server.getState().getLog() + .getLastCommittedIndex(); + Collection<BootStrapProgress> reports = checkAllProgress(committedIndex); + if (reports.contains(BootStrapProgress.NOPROGRESS)) { + LOG.debug("{} fails the setConfiguration request", server.getId()); + stagingState.fail(); + } else if (!reports.contains(BootStrapProgress.PROGRESSING)) { + // all caught up! + applyOldNewConf(); + for (LogAppender sender : senders) { + sender.getFollower().startAttendVote(); + } + } + } + } + + boolean isBootStrappingPeer(String peerId) { + return inStagingState() && getStagingState().contains(peerId); + } + + private void updateLastCommitted() { + final String selfId = server.getId(); + final RaftConfiguration conf = server.getRaftConf(); + long majorityInNewConf = computeLastCommitted(voterLists.get(0), + conf.containsInConf(selfId)); + final long oldLastCommitted = raftLog.getLastCommittedIndex(); + final LogEntryProto[] entriesToCommit; + if (!conf.isTransitional()) { + // copy the entries that may get committed out of the raftlog, to prevent + // the possible race that the log gets purged after the statemachine does + // a snapshot + entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, + Math.max(majorityInNewConf, oldLastCommitted) + 1); + server.getState().updateStatemachine(majorityInNewConf, currentTerm); + } else { // configuration is in transitional state + long majorityInOldConf = computeLastCommitted(voterLists.get(1), + conf.containsInOldConf(selfId)); + final long majority = Math.min(majorityInNewConf, majorityInOldConf); + entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, + Math.max(majority, oldLastCommitted) + 1); + server.getState().updateStatemachine(majority, currentTerm); + } + checkAndUpdateConfiguration(entriesToCommit); + } + + private boolean committedConf(LogEntryProto[] entries) { + final long currentCommitted = raftLog.getLastCommittedIndex(); + for (LogEntryProto entry : entries) { + if (entry.getIndex() <= currentCommitted && + ProtoUtils.isConfigurationLogEntry(entry)) { + return true; + } + } + return false; + } + + private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) { + final RaftConfiguration conf = server.getRaftConf(); + if (committedConf(entriesToCheck)) { + if (conf.isTransitional()) { + replicateNewConf(); + } else { // the (new) log entry has been committed + LOG.debug("{} sends success to setConfiguration request", server.getId()); + pendingRequests.replySetConfiguration(); + // if the leader is not included in the current configuration, step down + if (!conf.containsInConf(server.getId())) { + LOG.info("{} is not included in the new configuration {}. Step down.", + server.getId(), conf); + try { + // leave some time for all RPC senders to send out new conf entry + Thread.sleep(server.getMinTimeoutMs()); + } catch (InterruptedException ignored) { + } + // the pending request handler will send NotLeaderException for + // pending client requests when it stops + server.close(); + } + } + } + } + + /** + * when the (old, new) log entry has been committed, should replicate (new): + * 1) append (new) to log + * 2) update conf to (new) + * 3) update RpcSenders list + * 4) start replicating the log entry + */ + private void replicateNewConf() { + final RaftConfiguration conf = server.getRaftConf(); + final RaftConfiguration newConf = RaftConfiguration.newBuilder() + .setConf(conf) + .setLogEntryIndex(raftLog.getNextIndex()) + .build(); + // stop the LogAppender if the corresponding follower is no longer in the conf + updateSenders(newConf); + long index = raftLog.append(server.getState().getCurrentTerm(), newConf); + updateConfiguration(index, newConf); + notifySenders(); + } + + private long computeLastCommitted(List<FollowerInfo> followers, + boolean includeSelf) { + final int length = includeSelf ? followers.size() + 1 : followers.size(); + final long[] indices = new long[length]; + for (int i = 0; i < followers.size(); i++) { + indices[i] = followers.get(i).getMatchIndex(); + } + if (includeSelf) { + // note that we also need to wait for the local disk I/O + indices[length - 1] = raftLog.getLatestFlushedIndex(); + } + + Arrays.sort(indices); + return indices[(indices.length - 1) / 2]; + } + + private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) { + List<List<FollowerInfo>> lists = new ArrayList<>(2); + List<FollowerInfo> listForNew = senders.stream() + .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId())) + .map(LogAppender::getFollower) + .collect(Collectors.toList()); + lists.add(listForNew); + if (conf.isTransitional()) { + List<FollowerInfo> listForOld = senders.stream() + .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId())) + .map(LogAppender::getFollower) + .collect(Collectors.toList()); + lists.add(listForOld); + } + return lists; + } + + PendingRequest returnNoConfChange(SetConfigurationRequest r) { + PendingRequest pending = new PendingRequest(r); + pending.setSuccessReply(null); + return pending; + } + + void replyPendingRequest(long logIndex, CompletableFuture<Message> message) { + pendingRequests.replyPendingRequest(logIndex, message); + } + + TransactionContext getTransactionContext(long index) { + return pendingRequests.getTransactionContext(index); + } + + private class ConfigurationStagingState { + private final Map<String, RaftPeer> newPeers; + private final PeerConfiguration newConf; + + ConfigurationStagingState(Collection<RaftPeer> newPeers, + PeerConfiguration newConf) { + Map<String, RaftPeer> map = new HashMap<>(); + for (RaftPeer peer : newPeers) { + map.put(peer.getId(), peer); + } + this.newPeers = Collections.unmodifiableMap(map); + this.newConf = newConf; + } + + RaftConfiguration generateOldNewConf(RaftConfiguration current, + long logIndex) { + return RaftConfiguration.newBuilder() + .setConf(newConf) + .setOldConf(current) + .setLogEntryIndex(logIndex) + .build(); + } + + Collection<RaftPeer> getNewPeers() { + return newPeers.values(); + } + + boolean contains(String peerId) { + return newPeers.containsKey(peerId); + } + + void fail() { + Iterator<LogAppender> iterator = senders.iterator(); + while (iterator.hasNext()) { + LogAppender sender = iterator.next(); + if (!sender.getFollower().isAttendingVote()) { + iterator.remove(); + sender.stopSender(); + sender.interrupt(); + } + } + LeaderState.this.stagingState = null; + // send back failure response to client's request + pendingRequests.failSetConfiguration( + new ReconfigurationTimeoutException("Fail to set configuration " + + newConf + ". Timeout when bootstrapping new peers.")); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java new file mode 100644 index 0000000..143da28 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java @@ -0,0 +1,480 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.raft.server.impl.LeaderState.StateUpdateEventType; +import org.apache.raft.server.protocol.TermIndex; +import org.apache.raft.server.storage.FileInfo; +import org.apache.raft.server.storage.RaftLog; +import org.apache.raft.shaded.com.google.protobuf.ByteString; +import org.apache.raft.shaded.proto.RaftProtos.*; +import org.apache.raft.statemachine.SnapshotInfo; +import org.apache.raft.util.Daemon; +import org.apache.raft.util.ProtoUtils; +import org.apache.raft.util.Timestamp; +import org.slf4j.Logger; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.file.Path; +import java.util.*; + +import static org.apache.raft.server.RaftServerConfigKeys.*; +import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; + +/** + * A daemon thread appending log entries to a follower peer. + */ +public class LogAppender extends Daemon { + public static final Logger LOG = RaftServer.LOG; + + protected final RaftServer server; + private final LeaderState leaderState; + protected final RaftLog raftLog; + protected final FollowerInfo follower; + private final int maxBufferSize; + private final boolean batchSending; + private final LogEntryBuffer buffer; + private final long leaderTerm; + + private volatile boolean sending = true; + + public LogAppender(RaftServer server, LeaderState leaderState, FollowerInfo f) { + this.follower = f; + this.server = server; + this.leaderState = leaderState; + this.raftLog = server.getState().getLog(); + this.maxBufferSize = server.getProperties().getInt( + RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY, + RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT); + this.batchSending = server.getProperties().getBoolean( + RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY, + RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT); + this.buffer = new LogEntryBuffer(); + this.leaderTerm = server.getState().getCurrentTerm(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + server.getId() + " -> " + + follower.getPeer().getId() + ")"; + } + + @Override + public void run() { + try { + checkAndSendAppendEntries(); + } catch (InterruptedException | InterruptedIOException e) { + LOG.info(this + " was interrupted: " + e); + } + } + + protected boolean isAppenderRunning() { + return sending; + } + + public void stopSender() { + this.sending = false; + } + + public FollowerInfo getFollower() { + return follower; + } + + /** + * A buffer for log entries with size limitation. + */ + private class LogEntryBuffer { + private final List<LogEntryProto> buf = new ArrayList<>(); + private int totalSize = 0; + + void addEntry(LogEntryProto entry) { + buf.add(entry); + totalSize += entry.getSerializedSize(); + } + + boolean isFull() { + return totalSize >= maxBufferSize; + } + + boolean isEmpty() { + return buf.isEmpty(); + } + + AppendEntriesRequestProto getAppendRequest(TermIndex previous) { + final AppendEntriesRequestProto request = server + .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(), + previous, buf, !follower.isAttendingVote()); + buf.clear(); + totalSize = 0; + return request; + } + + int getPendingEntryNum() { + return buf.size(); + } + } + + private TermIndex getPrevious() { + TermIndex previous = ServerProtoUtils.toTermIndex( + raftLog.get(follower.getNextIndex() - 1)); + if (previous == null) { + // if previous is null, nextIndex must be equal to the log start + // index (otherwise we will install snapshot). + Preconditions.checkState(follower.getNextIndex() == raftLog.getStartIndex(), + "follower's next index %s, local log start index %s", + follower.getNextIndex(), raftLog.getStartIndex()); + SnapshotInfo snapshot = server.getState().getLatestSnapshot(); + previous = snapshot == null ? null : snapshot.getTermIndex(); + } + return previous; + } + + protected AppendEntriesRequestProto createRequest() { + final TermIndex previous = getPrevious(); + final long leaderNext = raftLog.getNextIndex(); + long next = follower.getNextIndex() + buffer.getPendingEntryNum(); + boolean toSend = false; + + if (leaderNext == next && !buffer.isEmpty()) { + // no new entries, then send out the entries in the buffer + toSend = true; + } else if (leaderNext > next) { + while (leaderNext > next && !buffer.isFull()) { + // stop adding entry once the buffer size is >= the max size + buffer.addEntry(raftLog.get(next++)); + } + if (buffer.isFull() || !batchSending) { + // buffer is full or batch sending is disabled, send out a request + toSend = true; + } + } + + if (toSend || shouldHeartbeat()) { + return buffer.getAppendRequest(previous); + } + return null; + } + + /** Send an appendEntries RPC; retry indefinitely. */ + private AppendEntriesReplyProto sendAppendEntriesWithRetries() + throws InterruptedException, InterruptedIOException { + int retry = 0; + AppendEntriesRequestProto request = null; + while (isAppenderRunning()) { // keep retrying for IOException + try { + if (request == null || request.getEntriesCount() == 0) { + request = createRequest(); + } + + if (request == null) { + LOG.trace("{} need not send AppendEntries now." + + " Wait for more entries.", server.getId()); + return null; + } else if (!isAppenderRunning()) { + LOG.debug("LogAppender {} has been stopped. Skip the request.", this); + return null; + } + + follower.updateLastRpcSendTime(); + final AppendEntriesReplyProto r = server.getServerRpc() + .sendAppendEntries(request); + follower.updateLastRpcResponseTime(); + + return r; + } catch (InterruptedIOException iioe) { + throw iioe; + } catch (IOException ioe) { + LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe); + } + if (isAppenderRunning()) { + Thread.sleep(leaderState.getSyncInterval()); + } + } + return null; + } + + protected class SnapshotRequestIter + implements Iterable<InstallSnapshotRequestProto> { + private final SnapshotInfo snapshot; + private final List<FileInfo> files; + private FileInputStream in; + private int fileIndex = 0; + + private FileInfo currentFileInfo; + private byte[] currentBuf; + private long currentFileSize; + private long currentOffset = 0; + private int chunkIndex = 0; + + private final String requestId; + private int requestIndex = 0; + + public SnapshotRequestIter(SnapshotInfo snapshot, String requestId) + throws IOException { + this.snapshot = snapshot; + this.requestId = requestId; + this.files = snapshot.getFiles(); + if (files.size() > 0) { + startReadFile(); + } + } + + private void startReadFile() throws IOException { + currentFileInfo = files.get(fileIndex); + File snapshotFile = currentFileInfo.getPath().toFile(); + currentFileSize = snapshotFile.length(); + final int bufLength = + (int) Math.min(leaderState.getSnapshotChunkMaxSize(), currentFileSize); + currentBuf = new byte[bufLength]; + currentOffset = 0; + chunkIndex = 0; + in = new FileInputStream(snapshotFile); + } + + @Override + public Iterator<InstallSnapshotRequestProto> iterator() { + return new Iterator<InstallSnapshotRequestProto>() { + @Override + public boolean hasNext() { + return fileIndex < files.size(); + } + + @Override + public InstallSnapshotRequestProto next() { + if (fileIndex >= files.size()) { + throw new NoSuchElementException(); + } + int targetLength = (int) Math.min(currentFileSize - currentOffset, + leaderState.getSnapshotChunkMaxSize()); + FileChunkProto chunk; + try { + chunk = readFileChunk(currentFileInfo, in, currentBuf, + targetLength, currentOffset, chunkIndex); + boolean done = (fileIndex == files.size() - 1) && + chunk.getDone(); + InstallSnapshotRequestProto request = + server.createInstallSnapshotRequest(follower.getPeer().getId(), + requestId, requestIndex++, snapshot, + Lists.newArrayList(chunk), done); + currentOffset += targetLength; + chunkIndex++; + + if (currentOffset >= currentFileSize) { + in.close(); + fileIndex++; + if (fileIndex < files.size()) { + startReadFile(); + } + } + + return request; + } catch (IOException e) { + if (in != null) { + try { + in.close(); + } catch (IOException ignored) { + } + } + LOG.warn("Got exception when preparing InstallSnapshot request", e); + throw new RuntimeException(e); + } + } + }; + } + } + + private FileChunkProto readFileChunk(FileInfo fileInfo, + FileInputStream in, byte[] buf, int length, long offset, int chunkIndex) + throws IOException { + FileChunkProto.Builder builder = FileChunkProto.newBuilder() + .setOffset(offset).setChunkIndex(chunkIndex); + IOUtils.readFully(in, buf, 0, length); + Path relativePath = server.getState().getStorage().getStorageDir() + .relativizeToRoot(fileInfo.getPath()); + builder.setFilename(relativePath.toString()); + builder.setDone(offset + length == fileInfo.getFileSize()); + builder.setFileDigest( + ByteString.copyFrom(fileInfo.getFileDigest().getDigest())); + builder.setData(ByteString.copyFrom(buf, 0, length)); + return builder.build(); + } + + private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) + throws InterruptedException, InterruptedIOException { + String requestId = UUID.randomUUID().toString(); + InstallSnapshotReplyProto reply = null; + try { + for (InstallSnapshotRequestProto request : + new SnapshotRequestIter(snapshot, requestId)) { + follower.updateLastRpcSendTime(); + reply = server.getServerRpc().sendInstallSnapshot(request); + follower.updateLastRpcResponseTime(); + + if (!reply.getServerReply().getSuccess()) { + return reply; + } + } + } catch (InterruptedIOException iioe) { + throw iioe; + } catch (Exception ioe) { + LOG.warn(this + ": failed to install SnapshotInfo " + snapshot.getFiles(), + ioe); + return null; + } + + if (reply != null) { + follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); + follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); + LOG.info("{}: install snapshot-{} successfully on follower {}", + server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); + } + return reply; + } + + protected SnapshotInfo shouldInstallSnapshot() { + final long logStartIndex = raftLog.getStartIndex(); + // we should install snapshot if the follower needs to catch up and: + // 1. there is no local log entry but there is snapshot + // 2. or the follower's next index is smaller than the log start index + if (follower.getNextIndex() < raftLog.getNextIndex()) { + SnapshotInfo snapshot = server.getState().getLatestSnapshot(); + if (follower.getNextIndex() < logStartIndex || + (logStartIndex == INVALID_LOG_INDEX && snapshot != null)) { + return snapshot; + } + } + return null; + } + + /** Check and send appendEntries RPC */ + private void checkAndSendAppendEntries() + throws InterruptedException, InterruptedIOException { + while (isAppenderRunning()) { + if (shouldSendRequest()) { + SnapshotInfo snapshot = shouldInstallSnapshot(); + if (snapshot != null) { + LOG.info("{}: follower {}'s next index is {}," + + " log's start index is {}, need to install snapshot", + server.getId(), follower.getPeer(), follower.getNextIndex(), + raftLog.getStartIndex()); + + final InstallSnapshotReplyProto r = installSnapshot(snapshot); + if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) { + checkResponseTerm(r.getTerm()); + } // otherwise if r is null, retry the snapshot installation + } else { + final AppendEntriesReplyProto r = sendAppendEntriesWithRetries(); + if (r != null) { + handleReply(r); + } + } + } + if (isAppenderRunning() && !shouldAppendEntries( + follower.getNextIndex() + buffer.getPendingEntryNum())) { + final long waitTime = getHeartbeatRemainingTime( + follower.getLastRpcTime()); + if (waitTime > 0) { + synchronized (this) { + wait(waitTime); + } + } + } + } + } + + private void handleReply(AppendEntriesReplyProto reply) { + if (reply != null) { + switch (reply.getResult()) { + case SUCCESS: + final long oldNextIndex = follower.getNextIndex(); + final long nextIndex = reply.getNextIndex(); + if (nextIndex < oldNextIndex) { + throw new IllegalStateException("nextIndex=" + nextIndex + + " < oldNextIndex=" + oldNextIndex + + ", reply=" + ProtoUtils.toString(reply)); + } + + if (nextIndex > oldNextIndex) { + follower.updateMatchIndex(nextIndex - 1); + follower.updateNextIndex(nextIndex); + submitEventOnSuccessAppend(); + } + break; + case NOT_LEADER: + // check if should step down + checkResponseTerm(reply.getTerm()); + break; + case INCONSISTENCY: + follower.decreaseNextIndex(reply.getNextIndex()); + break; + case UNRECOGNIZED: + LOG.warn("{} received UNRECOGNIZED AppendResult from {}", + server.getId(), follower.getPeer().getId()); + break; + } + } + } + + protected void submitEventOnSuccessAppend() { + LeaderState.StateUpdateEvent e = follower.isAttendingVote() ? + LeaderState.UPDATE_COMMIT_EVENT : + LeaderState.STAGING_PROGRESS_EVENT; + leaderState.submitUpdateStateEvent(e); + } + + public synchronized void notifyAppend() { + this.notify(); + } + + /** Should the leader send appendEntries RPC to this follower? */ + protected boolean shouldSendRequest() { + return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat(); + } + + private boolean shouldAppendEntries(long followerIndex) { + return followerIndex < raftLog.getNextIndex(); + } + + private boolean shouldHeartbeat() { + return getHeartbeatRemainingTime(follower.getLastRpcTime()) <= 0; + } + + /** + * @return the time in milliseconds that the leader should send a heartbeat. + */ + protected long getHeartbeatRemainingTime(Timestamp lastTime) { + return server.getMinTimeoutMs() / 2 - lastTime.elapsedTimeMs(); + } + + protected void checkResponseTerm(long responseTerm) { + synchronized (server) { + if (isAppenderRunning() && follower.isAttendingVote() + && responseTerm > leaderState.getCurrentTerm()) { + leaderState.submitUpdateStateEvent( + new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN, + responseTerm)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java new file mode 100644 index 0000000..b5ed775 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +public interface LogAppenderFactory { + LogAppender getLogAppender(RaftServer server, LeaderState state, + FollowerInfo f); + + class SynchronousLogAppenderFactory implements LogAppenderFactory { + @Override + public LogAppender getLogAppender(RaftServer server, LeaderState state, + FollowerInfo f) { + return new LogAppender(server, state, f); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java new file mode 100644 index 0000000..774a0c5 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/PeerConfiguration.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.base.Preconditions; +import org.apache.raft.protocol.RaftPeer; + +import java.util.*; + +/** + * The peer configuration of a raft cluster. + * + * The objects of this class are immutable. + */ +class PeerConfiguration { + private final Map<String, RaftPeer> peers; + + PeerConfiguration(Iterable<RaftPeer> peers) { + Preconditions.checkNotNull(peers); + Map<String, RaftPeer> map = new HashMap<>(); + for(RaftPeer p : peers) { + map.put(p.getId(), p); + } + this.peers = Collections.unmodifiableMap(map); + Preconditions.checkState(!this.peers.isEmpty()); + } + + Collection<RaftPeer> getPeers() { + return Collections.unmodifiableCollection(peers.values()); + } + + int size() { + return peers.size(); + } + + @Override + public String toString() { + return peers.values().toString(); + } + + RaftPeer getPeer(String id) { + return peers.get(id); + } + + boolean contains(String id) { + return peers.containsKey(id); + } + + List<RaftPeer> getOtherPeers(String selfId) { + List<RaftPeer> others = new ArrayList<>(); + for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) { + if (!selfId.equals(entry.getValue().getId())) { + others.add(entry.getValue()); + } + } + return others; + } + + boolean hasMajority(Collection<String> others, String selfId) { + Preconditions.checkArgument(!others.contains(selfId)); + int num = 0; + if (contains(selfId)) { + num++; + } + for (String other : others) { + if (contains(other)) { + num++; + } + if (num > size() / 2) { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java new file mode 100644 index 0000000..689566a --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.base.Preconditions; +import org.apache.raft.protocol.Message; +import org.apache.raft.protocol.RaftClientReply; +import org.apache.raft.protocol.RaftClientRequest; +import org.apache.raft.protocol.SetConfigurationRequest; +import org.apache.raft.statemachine.TransactionContext; + +import java.util.concurrent.CompletableFuture; + +public class PendingRequest implements Comparable<PendingRequest> { + private final Long index; + private final RaftClientRequest request; + private final TransactionContext entry; + private final CompletableFuture<RaftClientReply> future; + + PendingRequest(long index, RaftClientRequest request, + TransactionContext entry) { + this.index = index; + this.request = request; + this.entry = entry; + this.future = new CompletableFuture<>(); + } + + PendingRequest(SetConfigurationRequest request) { + this(RaftServerConstants.INVALID_LOG_INDEX, request, null); + } + + long getIndex() { + return index; + } + + RaftClientRequest getRequest() { + return request; + } + + public CompletableFuture<RaftClientReply> getFuture() { + return future; + } + + TransactionContext getEntry() { + return entry; + } + + synchronized void setException(Throwable e) { + Preconditions.checkArgument(e != null); + future.completeExceptionally(e); + } + + synchronized void setReply(RaftClientReply r) { + Preconditions.checkArgument(r != null); + future.complete(r); + } + + void setSuccessReply(Message message) { + setReply(new RaftClientReply(getRequest(), message)); + } + + @Override + public int compareTo(PendingRequest that) { + return Long.compare(this.index, that.index); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(index=" + index + + ", request=" + request; + } +}
