http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LogAppender.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/LogAppender.java b/raft-server/src/main/java/org/apache/raft/server/LogAppender.java deleted file mode 100644 index 1d2983b..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/LogAppender.java +++ /dev/null @@ -1,481 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.io.IOUtils; -import org.apache.raft.server.LeaderState.StateUpdateEventType; -import org.apache.raft.server.impl.ServerProtoUtils; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.FileInfo; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.shaded.com.google.protobuf.ByteString; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.ProtoUtils; -import org.apache.raft.util.Timestamp; -import org.slf4j.Logger; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.file.Path; -import java.util.*; - -import static org.apache.raft.server.RaftServerConfigKeys.*; -import static org.apache.raft.server.RaftServerConstants.INVALID_LOG_INDEX; - -/** - * A daemon thread appending log entries to a follower peer. - */ -public class LogAppender extends Daemon { - public static final Logger LOG = RaftServer.LOG; - - protected final RaftServer server; - private final LeaderState leaderState; - protected final RaftLog raftLog; - protected final FollowerInfo follower; - private final int maxBufferSize; - private final boolean batchSending; - private final LogEntryBuffer buffer; - private final long leaderTerm; - - private volatile boolean sending = true; - - public LogAppender(RaftServer server, LeaderState leaderState, FollowerInfo f) { - this.follower = f; - this.server = server; - this.leaderState = leaderState; - this.raftLog = server.getState().getLog(); - this.maxBufferSize = server.getProperties().getInt( - RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY, - RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT); - this.batchSending = server.getProperties().getBoolean( - RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY, - RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT); - this.buffer = new LogEntryBuffer(); - this.leaderTerm = server.getState().getCurrentTerm(); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "(" + server.getId() + " -> " + - follower.getPeer().getId() + ")"; - } - - @Override - public void run() { - try { - checkAndSendAppendEntries(); - } catch (InterruptedException | InterruptedIOException e) { - LOG.info(this + " was interrupted: " + e); - } - } - - protected boolean isAppenderRunning() { - return sending; - } - - public void stopSender() { - this.sending = false; - } - - public FollowerInfo getFollower() { - return follower; - } - - /** - * A buffer for log entries with size limitation. - */ - private class LogEntryBuffer { - private final List<LogEntryProto> buf = new ArrayList<>(); - private int totalSize = 0; - - void addEntry(LogEntryProto entry) { - buf.add(entry); - totalSize += entry.getSerializedSize(); - } - - boolean isFull() { - return totalSize >= maxBufferSize; - } - - boolean isEmpty() { - return buf.isEmpty(); - } - - AppendEntriesRequestProto getAppendRequest(TermIndex previous) { - final AppendEntriesRequestProto request = server - .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(), - previous, buf, !follower.isAttendingVote()); - buf.clear(); - totalSize = 0; - return request; - } - - int getPendingEntryNum() { - return buf.size(); - } - } - - private TermIndex getPrevious() { - TermIndex previous = ServerProtoUtils.toTermIndex( - raftLog.get(follower.getNextIndex() - 1)); - if (previous == null) { - // if previous is null, nextIndex must be equal to the log start - // index (otherwise we will install snapshot). - Preconditions.checkState(follower.getNextIndex() == raftLog.getStartIndex(), - "follower's next index %s, local log start index %s", - follower.getNextIndex(), raftLog.getStartIndex()); - SnapshotInfo snapshot = server.getState().getLatestSnapshot(); - previous = snapshot == null ? null : snapshot.getTermIndex(); - } - return previous; - } - - protected AppendEntriesRequestProto createRequest() { - final TermIndex previous = getPrevious(); - final long leaderNext = raftLog.getNextIndex(); - long next = follower.getNextIndex() + buffer.getPendingEntryNum(); - boolean toSend = false; - - if (leaderNext == next && !buffer.isEmpty()) { - // no new entries, then send out the entries in the buffer - toSend = true; - } else if (leaderNext > next) { - while (leaderNext > next && !buffer.isFull()) { - // stop adding entry once the buffer size is >= the max size - buffer.addEntry(raftLog.get(next++)); - } - if (buffer.isFull() || !batchSending) { - // buffer is full or batch sending is disabled, send out a request - toSend = true; - } - } - - if (toSend || shouldHeartbeat()) { - return buffer.getAppendRequest(previous); - } - return null; - } - - /** Send an appendEntries RPC; retry indefinitely. */ - private AppendEntriesReplyProto sendAppendEntriesWithRetries() - throws InterruptedException, InterruptedIOException { - int retry = 0; - AppendEntriesRequestProto request = null; - while (isAppenderRunning()) { // keep retrying for IOException - try { - if (request == null || request.getEntriesCount() == 0) { - request = createRequest(); - } - - if (request == null) { - LOG.trace("{} need not send AppendEntries now." + - " Wait for more entries.", server.getId()); - return null; - } else if (!isAppenderRunning()) { - LOG.debug("LogAppender {} has been stopped. Skip the request.", this); - return null; - } - - follower.updateLastRpcSendTime(); - final AppendEntriesReplyProto r = server.getServerRpc() - .sendAppendEntries(request); - follower.updateLastRpcResponseTime(); - - return r; - } catch (InterruptedIOException iioe) { - throw iioe; - } catch (IOException ioe) { - LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe); - } - if (isAppenderRunning()) { - Thread.sleep(leaderState.getSyncInterval()); - } - } - return null; - } - - protected class SnapshotRequestIter - implements Iterable<InstallSnapshotRequestProto> { - private final SnapshotInfo snapshot; - private final List<FileInfo> files; - private FileInputStream in; - private int fileIndex = 0; - - private FileInfo currentFileInfo; - private byte[] currentBuf; - private long currentFileSize; - private long currentOffset = 0; - private int chunkIndex = 0; - - private final String requestId; - private int requestIndex = 0; - - public SnapshotRequestIter(SnapshotInfo snapshot, String requestId) - throws IOException { - this.snapshot = snapshot; - this.requestId = requestId; - this.files = snapshot.getFiles(); - if (files.size() > 0) { - startReadFile(); - } - } - - private void startReadFile() throws IOException { - currentFileInfo = files.get(fileIndex); - File snapshotFile = currentFileInfo.getPath().toFile(); - currentFileSize = snapshotFile.length(); - final int bufLength = - (int) Math.min(leaderState.getSnapshotChunkMaxSize(), currentFileSize); - currentBuf = new byte[bufLength]; - currentOffset = 0; - chunkIndex = 0; - in = new FileInputStream(snapshotFile); - } - - @Override - public Iterator<InstallSnapshotRequestProto> iterator() { - return new Iterator<InstallSnapshotRequestProto>() { - @Override - public boolean hasNext() { - return fileIndex < files.size(); - } - - @Override - public InstallSnapshotRequestProto next() { - if (fileIndex >= files.size()) { - throw new NoSuchElementException(); - } - int targetLength = (int) Math.min(currentFileSize - currentOffset, - leaderState.getSnapshotChunkMaxSize()); - FileChunkProto chunk; - try { - chunk = readFileChunk(currentFileInfo, in, currentBuf, - targetLength, currentOffset, chunkIndex); - boolean done = (fileIndex == files.size() - 1) && - chunk.getDone(); - InstallSnapshotRequestProto request = - server.createInstallSnapshotRequest(follower.getPeer().getId(), - requestId, requestIndex++, snapshot, - Lists.newArrayList(chunk), done); - currentOffset += targetLength; - chunkIndex++; - - if (currentOffset >= currentFileSize) { - in.close(); - fileIndex++; - if (fileIndex < files.size()) { - startReadFile(); - } - } - - return request; - } catch (IOException e) { - if (in != null) { - try { - in.close(); - } catch (IOException ignored) { - } - } - LOG.warn("Got exception when preparing InstallSnapshot request", e); - throw new RuntimeException(e); - } - } - }; - } - } - - private FileChunkProto readFileChunk(FileInfo fileInfo, - FileInputStream in, byte[] buf, int length, long offset, int chunkIndex) - throws IOException { - FileChunkProto.Builder builder = FileChunkProto.newBuilder() - .setOffset(offset).setChunkIndex(chunkIndex); - IOUtils.readFully(in, buf, 0, length); - Path relativePath = server.getState().getStorage().getStorageDir() - .relativizeToRoot(fileInfo.getPath()); - builder.setFilename(relativePath.toString()); - builder.setDone(offset + length == fileInfo.getFileSize()); - builder.setFileDigest( - ByteString.copyFrom(fileInfo.getFileDigest().getDigest())); - builder.setData(ByteString.copyFrom(buf, 0, length)); - return builder.build(); - } - - private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) - throws InterruptedException, InterruptedIOException { - String requestId = UUID.randomUUID().toString(); - InstallSnapshotReplyProto reply = null; - try { - for (InstallSnapshotRequestProto request : - new SnapshotRequestIter(snapshot, requestId)) { - follower.updateLastRpcSendTime(); - reply = server.getServerRpc().sendInstallSnapshot(request); - follower.updateLastRpcResponseTime(); - - if (!reply.getServerReply().getSuccess()) { - return reply; - } - } - } catch (InterruptedIOException iioe) { - throw iioe; - } catch (Exception ioe) { - LOG.warn(this + ": failed to install SnapshotInfo " + snapshot.getFiles(), - ioe); - return null; - } - - if (reply != null) { - follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); - follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); - LOG.info("{}: install snapshot-{} successfully on follower {}", - server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); - } - return reply; - } - - protected SnapshotInfo shouldInstallSnapshot() { - final long logStartIndex = raftLog.getStartIndex(); - // we should install snapshot if the follower needs to catch up and: - // 1. there is no local log entry but there is snapshot - // 2. or the follower's next index is smaller than the log start index - if (follower.getNextIndex() < raftLog.getNextIndex()) { - SnapshotInfo snapshot = server.getState().getLatestSnapshot(); - if (follower.getNextIndex() < logStartIndex || - (logStartIndex == INVALID_LOG_INDEX && snapshot != null)) { - return snapshot; - } - } - return null; - } - - /** Check and send appendEntries RPC */ - private void checkAndSendAppendEntries() - throws InterruptedException, InterruptedIOException { - while (isAppenderRunning()) { - if (shouldSendRequest()) { - SnapshotInfo snapshot = shouldInstallSnapshot(); - if (snapshot != null) { - LOG.info("{}: follower {}'s next index is {}," + - " log's start index is {}, need to install snapshot", - server.getId(), follower.getPeer(), follower.getNextIndex(), - raftLog.getStartIndex()); - - final InstallSnapshotReplyProto r = installSnapshot(snapshot); - if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) { - checkResponseTerm(r.getTerm()); - } // otherwise if r is null, retry the snapshot installation - } else { - final AppendEntriesReplyProto r = sendAppendEntriesWithRetries(); - if (r != null) { - handleReply(r); - } - } - } - if (isAppenderRunning() && !shouldAppendEntries( - follower.getNextIndex() + buffer.getPendingEntryNum())) { - final long waitTime = getHeartbeatRemainingTime( - follower.getLastRpcTime()); - if (waitTime > 0) { - synchronized (this) { - wait(waitTime); - } - } - } - } - } - - private void handleReply(AppendEntriesReplyProto reply) { - if (reply != null) { - switch (reply.getResult()) { - case SUCCESS: - final long oldNextIndex = follower.getNextIndex(); - final long nextIndex = reply.getNextIndex(); - if (nextIndex < oldNextIndex) { - throw new IllegalStateException("nextIndex=" + nextIndex - + " < oldNextIndex=" + oldNextIndex - + ", reply=" + ProtoUtils.toString(reply)); - } - - if (nextIndex > oldNextIndex) { - follower.updateMatchIndex(nextIndex - 1); - follower.updateNextIndex(nextIndex); - submitEventOnSuccessAppend(); - } - break; - case NOT_LEADER: - // check if should step down - checkResponseTerm(reply.getTerm()); - break; - case INCONSISTENCY: - follower.decreaseNextIndex(reply.getNextIndex()); - break; - case UNRECOGNIZED: - LOG.warn("{} received UNRECOGNIZED AppendResult from {}", - server.getId(), follower.getPeer().getId()); - break; - } - } - } - - protected void submitEventOnSuccessAppend() { - LeaderState.StateUpdateEvent e = follower.isAttendingVote() ? - LeaderState.UPDATE_COMMIT_EVENT : - LeaderState.STAGING_PROGRESS_EVENT; - leaderState.submitUpdateStateEvent(e); - } - - public synchronized void notifyAppend() { - this.notify(); - } - - /** Should the leader send appendEntries RPC to this follower? */ - protected boolean shouldSendRequest() { - return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat(); - } - - private boolean shouldAppendEntries(long followerIndex) { - return followerIndex < raftLog.getNextIndex(); - } - - private boolean shouldHeartbeat() { - return getHeartbeatRemainingTime(follower.getLastRpcTime()) <= 0; - } - - /** - * @return the time in milliseconds that the leader should send a heartbeat. - */ - protected long getHeartbeatRemainingTime(Timestamp lastTime) { - return server.getMinTimeoutMs() / 2 - lastTime.elapsedTimeMs(); - } - - protected void checkResponseTerm(long responseTerm) { - synchronized (server) { - if (isAppenderRunning() && follower.isAttendingVote() - && responseTerm > leaderState.getCurrentTerm()) { - leaderState.submitUpdateStateEvent( - new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN, - responseTerm)); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java b/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java deleted file mode 100644 index 3cb2b06..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -public interface LogAppenderFactory { - LogAppender getLogAppender(RaftServer server, LeaderState state, - FollowerInfo f); - - class SynchronousLogAppenderFactory implements LogAppenderFactory { - @Override - public LogAppender getLogAppender(RaftServer server, LeaderState state, - FollowerInfo f) { - return new LogAppender(server, state, f); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java deleted file mode 100644 index 9f01390..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.base.Preconditions; -import org.apache.raft.protocol.RaftPeer; - -import java.util.*; - -/** - * The peer configuration of a raft cluster. - * - * The objects of this class are immutable. - */ -class PeerConfiguration { - private final Map<String, RaftPeer> peers; - - PeerConfiguration(Iterable<RaftPeer> peers) { - Preconditions.checkNotNull(peers); - Map<String, RaftPeer> map = new HashMap<>(); - for(RaftPeer p : peers) { - map.put(p.getId(), p); - } - this.peers = Collections.unmodifiableMap(map); - Preconditions.checkState(!this.peers.isEmpty()); - } - - Collection<RaftPeer> getPeers() { - return Collections.unmodifiableCollection(peers.values()); - } - - int size() { - return peers.size(); - } - - @Override - public String toString() { - return peers.values().toString(); - } - - RaftPeer getPeer(String id) { - return peers.get(id); - } - - boolean contains(String id) { - return peers.containsKey(id); - } - - List<RaftPeer> getOtherPeers(String selfId) { - List<RaftPeer> others = new ArrayList<>(); - for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) { - if (!selfId.equals(entry.getValue().getId())) { - others.add(entry.getValue()); - } - } - return others; - } - - boolean hasMajority(Collection<String> others, String selfId) { - Preconditions.checkArgument(!others.contains(selfId)); - int num = 0; - if (contains(selfId)) { - num++; - } - for (String other : others) { - if (contains(other)) { - num++; - } - if (num > size() / 2) { - return true; - } - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java b/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java deleted file mode 100644 index 3598349..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.base.Preconditions; -import org.apache.raft.protocol.Message; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.protocol.SetConfigurationRequest; -import org.apache.raft.statemachine.TransactionContext; - -import java.util.concurrent.CompletableFuture; - -public class PendingRequest implements Comparable<PendingRequest> { - private final Long index; - private final RaftClientRequest request; - private final TransactionContext entry; - private final CompletableFuture<RaftClientReply> future; - - PendingRequest(long index, RaftClientRequest request, - TransactionContext entry) { - this.index = index; - this.request = request; - this.entry = entry; - this.future = new CompletableFuture<>(); - } - - PendingRequest(SetConfigurationRequest request) { - this(RaftServerConstants.INVALID_LOG_INDEX, request, null); - } - - long getIndex() { - return index; - } - - RaftClientRequest getRequest() { - return request; - } - - public CompletableFuture<RaftClientReply> getFuture() { - return future; - } - - TransactionContext getEntry() { - return entry; - } - - synchronized void setException(Throwable e) { - Preconditions.checkArgument(e != null); - future.completeExceptionally(e); - } - - synchronized void setReply(RaftClientReply r) { - Preconditions.checkArgument(r != null); - future.complete(r); - } - - void setSuccessReply(Message message) { - setReply(new RaftClientReply(getRequest(), message)); - } - - @Override - public int compareTo(PendingRequest that) { - return Long.compare(this.index, that.index); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "(index=" + index - + ", request=" + request; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java b/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java deleted file mode 100644 index a5731fd..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; - -import org.apache.raft.protocol.Message; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.protocol.RaftException; -import org.apache.raft.protocol.SetConfigurationRequest; -import org.apache.raft.statemachine.TransactionContext; -import org.slf4j.Logger; - -import com.google.common.base.Preconditions; - -class PendingRequests { - private static final Logger LOG = RaftServer.LOG; - - private PendingRequest pendingSetConf; - private final RaftServer server; - private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>(); - private PendingRequest last = null; - - PendingRequests(RaftServer server) { - this.server = server; - } - - PendingRequest addPendingRequest(long index, RaftClientRequest request, - TransactionContext entry) { - // externally synced for now - Preconditions.checkArgument(!request.isReadOnly()); - Preconditions.checkState(last == null || index == last.getIndex() + 1); - return add(index, request, entry); - } - - private PendingRequest add(long index, RaftClientRequest request, - TransactionContext entry) { - final PendingRequest pending = new PendingRequest(index, request, entry); - pendingRequests.put(index, pending); - last = pending; - return pending; - } - - PendingRequest addConfRequest(SetConfigurationRequest request) { - Preconditions.checkState(pendingSetConf == null); - pendingSetConf = new PendingRequest(request); - return pendingSetConf; - } - - void replySetConfiguration() { - // we allow the pendingRequest to be null in case that the new leader - // commits the new configuration while it has not received the retry - // request from the client - if (pendingSetConf != null) { - // for setConfiguration we do not need to wait for statemachine. send back - // reply after it's committed. - pendingSetConf.setSuccessReply(null); - pendingSetConf = null; - } - } - - void failSetConfiguration(RaftException e) { - Preconditions.checkState(pendingSetConf != null); - pendingSetConf.setException(e); - pendingSetConf = null; - } - - TransactionContext getTransactionContext(long index) { - PendingRequest pendingRequest = pendingRequests.get(index); - // it is possible that the pendingRequest is null if this peer just becomes - // the new leader and commits transactions received by the previous leader - return pendingRequest != null ? pendingRequest.getEntry() : null; - } - - void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) { - final PendingRequest pending = pendingRequests.get(index); - if (pending != null) { - Preconditions.checkState(pending.getIndex() == index); - - messageFuture.whenComplete((reply, exception) -> { - if (exception == null) { - pending.setSuccessReply(reply); - } else { - pending.setException(exception); - } - }); - } - } - - /** - * The leader state is stopped. Send NotLeaderException to all the pending - * requests since they have not got applied to the state machine yet. - */ - void sendNotLeaderResponses() throws IOException { - LOG.info("{} sends responses before shutting down PendingRequestsHandler", - server.getId()); - - Collection<TransactionContext> pendingEntries = pendingRequests.values().stream() - .map(PendingRequest::getEntry).collect(Collectors.toList()); - // notify the state machine about stepping down - server.getStateMachine().notifyNotLeader(pendingEntries); - pendingRequests.values().forEach(this::setNotLeaderException); - if (pendingSetConf != null) { - setNotLeaderException(pendingSetConf); - } - } - - private void setNotLeaderException(PendingRequest pending) { - RaftClientReply reply = new RaftClientReply(pending.getRequest(), - server.generateNotLeaderException()); - pending.setReply(reply); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java deleted file mode 100644 index 54ed9d6..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.raft.protocol.RaftPeer; - -import java.util.*; -import java.util.concurrent.ThreadLocalRandom; - -/** - * The configuration of the raft cluster. - * - * The configuration is stable if there is no on-going peer change. Otherwise, - * the configuration is transitional, i.e. in the middle of a peer change. - * - * The objects of this class are immutable. - */ -public class RaftConfiguration { - /** Create a {@link Builder}. */ - public static Builder newBuilder() { - return new Builder(); - } - - /** To build {@link RaftConfiguration} objects. */ - public static class Builder { - private PeerConfiguration oldConf; - private PeerConfiguration conf; - private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX; - - private boolean forceStable = false; - private boolean forceTransitional = false; - - private Builder() {} - - public Builder setConf(PeerConfiguration conf) { - Preconditions.checkNotNull(conf); - Preconditions.checkState(this.conf == null, "conf is already set."); - this.conf = conf; - return this; - } - - public Builder setConf(Iterable<RaftPeer> peers) { - return setConf(new PeerConfiguration(peers)); - } - - public Builder setConf(RaftPeer[] peers) { - return setConf(Arrays.asList(peers)); - } - - Builder setConf(RaftConfiguration transitionalConf) { - Preconditions.checkNotNull(transitionalConf); - Preconditions.checkState(transitionalConf.isTransitional()); - - Preconditions.checkState(!forceTransitional); - forceStable = true; - return setConf(transitionalConf.conf); - } - - - public Builder setOldConf(PeerConfiguration oldConf) { - Preconditions.checkNotNull(oldConf); - Preconditions.checkState(this.oldConf == null, "oldConf is already set."); - this.oldConf = oldConf; - return this; - } - - public Builder setOldConf(Iterable<RaftPeer> oldPeers) { - return setOldConf(new PeerConfiguration(oldPeers)); - } - - public Builder setOldConf(RaftPeer[] oldPeers) { - return setOldConf(Arrays.asList(oldPeers)); - } - - Builder setOldConf(RaftConfiguration stableConf) { - Preconditions.checkNotNull(stableConf); - Preconditions.checkState(stableConf.isStable()); - - Preconditions.checkState(!forceStable); - forceTransitional = true; - return setOldConf(stableConf.conf); - } - - public Builder setLogEntryIndex(long logEntryIndex) { - Preconditions.checkArgument( - logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX); - Preconditions.checkState( - this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX, - "logEntryIndex is already set."); - this.logEntryIndex = logEntryIndex; - return this; - } - - /** Build a {@link RaftConfiguration}. */ - public RaftConfiguration build() { - if (forceTransitional) { - Preconditions.checkState(oldConf != null); - } - if (forceStable) { - Preconditions.checkState(oldConf == null); - } - return new RaftConfiguration(conf, oldConf, logEntryIndex); - } - } - - /** Non-null only if this configuration is transitional. */ - private final PeerConfiguration oldConf; - /** - * The current peer configuration while this configuration is stable; - * or the new peer configuration while this configuration is transitional. - */ - private final PeerConfiguration conf; - - /** The index of the corresponding log entry for this configuration. */ - private final long logEntryIndex; - - private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf, - long logEntryIndex) { - Preconditions.checkNotNull(conf); - this.conf = conf; - this.oldConf = oldConf; - this.logEntryIndex = logEntryIndex; - } - - /** Is this configuration transitional, i.e. in the middle of a peer change? */ - public boolean isTransitional() { - return oldConf != null; - } - - /** Is this configuration stable, i.e. no on-going peer change? */ - public boolean isStable() { - return oldConf == null; - } - - boolean containsInConf(String peerId) { - return conf.contains(peerId); - } - - boolean containsInOldConf(String peerId) { - return oldConf != null && oldConf.contains(peerId); - } - - public boolean contains(String peerId) { - return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId)); - } - - /** - * @return the peer corresponding to the given id; - * or return null if the peer is not in this configuration. - */ - public RaftPeer getPeer(String id) { - if (id == null) { - return null; - } - RaftPeer peer = conf.getPeer(id); - if (peer != null) { - return peer; - } else if (oldConf != null) { - return oldConf.getPeer(id); - } - return null; - } - - /** @return all the peers from the conf, and the old conf if it exists. */ - public Collection<RaftPeer> getPeers() { - final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers()); - if (oldConf != null) { - oldConf.getPeers().stream().filter(p -> !peers.contains(p)) - .forEach(peers::add); - } - return peers; - } - - /** - * @return all the peers other than the given self id from the conf, - * and the old conf if it exists. - */ - public Collection<RaftPeer> getOtherPeers(String selfId) { - Collection<RaftPeer> others = conf.getOtherPeers(selfId); - if (oldConf != null) { - oldConf.getOtherPeers(selfId).stream() - .filter(p -> !others.contains(p)) - .forEach(others::add); - } - return others; - } - - /** @return true if the self id together with the others are in the majority. */ - public boolean hasMajority(Collection<String> others, String selfId) { - Preconditions.checkArgument(!others.contains(selfId)); - return conf.hasMajority(others, selfId) && - (oldConf == null || oldConf.hasMajority(others, selfId)); - } - - @Override - public String toString() { - return conf + (oldConf != null ? "old:" + oldConf : ""); - } - - @VisibleForTesting - boolean hasNoChange(RaftPeer[] newMembers) { - if (!isStable() || conf.size() != newMembers.length) { - return false; - } - for (RaftPeer peer : newMembers) { - if (!conf.contains(peer.getId())) { - return false; - } - } - return true; - } - - long getLogEntryIndex() { - return logEntryIndex; - } - - static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers, - RaftConfiguration old) { - List<RaftPeer> peers = new ArrayList<>(); - for (RaftPeer p : newMembers) { - if (!old.containsInConf(p.getId())) { - peers.add(p); - } - } - return peers; - } - - RaftPeer getRandomPeer(String exclusiveId) { - final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId); - if (peers.isEmpty()) { - return null; - } - final int index = ThreadLocalRandom.current().nextInt(peers.size()); - return peers.get(index); - } - - public Collection<RaftPeer> getPeersInOldConf() { - return oldConf != null ? oldConf.getPeers() : Collections.emptyList(); - } - - public Collection<RaftPeer> getPeersInConf() { - return conf.getPeers(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java deleted file mode 100644 index c7ea6b2..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java +++ /dev/null @@ -1,750 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.*; -import org.apache.raft.server.protocol.RaftServerProtocol; -import org.apache.raft.server.impl.ServerProtoUtils; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.FileInfo; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.CodeInjectionForTesting; -import org.apache.raft.util.LifeCycle; -import org.apache.raft.util.ProtoUtils; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.OptionalLong; -import java.util.concurrent.CompletableFuture; - -import static org.apache.raft.server.LeaderState.UPDATE_COMMIT_EVENT; -import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; -import static org.apache.raft.util.LifeCycle.State.*; - -public class RaftServer implements RaftServerProtocol, Closeable { - public static final Logger LOG = LoggerFactory.getLogger(RaftServer.class); - - private static final String CLASS_NAME = RaftServer.class.getSimpleName(); - static final String REQUEST_VOTE = CLASS_NAME + ".requestVote"; - static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; - static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; - - - private final int minTimeoutMs; - private final int maxTimeoutMs; - - private final LifeCycle lifeCycle; - private final ServerState state; - private final StateMachine stateMachine; - private final RaftProperties properties; - private volatile Role role; - - /** used when the peer is follower, to monitor election timeout */ - private volatile FollowerState heartbeatMonitor; - - /** used when the peer is candidate, to request votes from other peers */ - private volatile LeaderElection electionDaemon; - - /** used when the peer is leader */ - private volatile LeaderState leaderState; - - private RaftServerRpc serverRpc; - - private final LogAppenderFactory appenderFactory; - - public RaftServer(String id, RaftConfiguration raftConf, - RaftProperties properties, StateMachine stateMachine) throws IOException { - this.lifeCycle = new LifeCycle(id); - minTimeoutMs = properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); - maxTimeoutMs = properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); - Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs, - "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); - this.properties = properties; - this.stateMachine = stateMachine; - this.state = new ServerState(id, raftConf, properties, this, stateMachine); - appenderFactory = initAppenderFactory(); - } - - public int getMinTimeoutMs() { - return minTimeoutMs; - } - - public int getMaxTimeoutMs() { - return maxTimeoutMs; - } - - public int getRandomTimeoutMs() { - return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs); - } - - public StateMachine getStateMachine() { - return this.stateMachine; - } - - public LogAppenderFactory getLogAppenderFactory() { - return appenderFactory; - } - - private LogAppenderFactory initAppenderFactory() { - Class<? extends LogAppenderFactory> factoryClass = properties.getClass( - RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT, - LogAppenderFactory.class); - return RaftUtils.newInstance(factoryClass); - } - - /** - * Used by tests to set initial raft configuration with correct port bindings. - */ - @VisibleForTesting - public void setInitialConf(RaftConfiguration conf) { - this.state.setInitialConf(conf); - } - - public void setServerRpc(RaftServerRpc serverRpc) { - this.serverRpc = serverRpc; - // add peers into rpc service - RaftConfiguration conf = getRaftConf(); - if (conf != null) { - addPeersToRPC(conf.getPeers()); - } - } - - public RaftServerRpc getServerRpc() { - return serverRpc; - } - - public void start() { - lifeCycle.transition(STARTING); - state.start(); - RaftConfiguration conf = getRaftConf(); - if (conf != null && conf.contains(getId())) { - LOG.debug("{} starts as a follower", getId()); - startAsFollower(); - } else { - LOG.debug("{} starts with initializing state", getId()); - startInitializing(); - } - } - - /** - * The peer belongs to the current configuration, should start as a follower - */ - private void startAsFollower() { - role = Role.FOLLOWER; - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - - serverRpc.start(); - lifeCycle.transition(RUNNING); - } - - /** - * The peer does not have any configuration (maybe it will later be included - * in some configuration). Start still as a follower but will not vote or - * start election. - */ - private void startInitializing() { - role = Role.FOLLOWER; - // do not start heartbeatMonitoring - serverRpc.start(); - } - - public ServerState getState() { - return this.state; - } - - public String getId() { - return getState().getSelfId(); - } - - public RaftConfiguration getRaftConf() { - return getState().getRaftConf(); - } - - @Override - public void close() { - lifeCycle.checkStateAndClose(() -> { - try { - shutdownHeartbeatMonitor(); - shutdownElectionDaemon(); - shutdownLeaderState(); - - serverRpc.shutdown(); - state.close(); - } catch (Exception ignored) { - LOG.warn("Failed to kill " + state.getSelfId(), ignored); - } - }); - } - - public boolean isAlive() { - return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED); - } - - public boolean isFollower() { - return role == Role.FOLLOWER; - } - - public boolean isCandidate() { - return role == Role.CANDIDATE; - } - - public boolean isLeader() { - return role == Role.LEADER; - } - - Role getRole() { - return role; - } - - /** - * Change the server state to Follower if necessary - * @param newTerm The new term. - * @param sync We will call {@link ServerState#persistMetadata()} if this is - * set to true and term/votedFor get updated. - * @return if the term/votedFor should be updated to the new term - * @throws IOException if term/votedFor persistence failed. - */ - synchronized boolean changeToFollower(long newTerm, boolean sync) - throws IOException { - final Role old = role; - role = Role.FOLLOWER; - - boolean metadataUpdated = false; - if (newTerm > state.getCurrentTerm()) { - state.setCurrentTerm(newTerm); - state.resetLeaderAndVotedFor(); - metadataUpdated = true; - } - - if (old == Role.LEADER) { - assert leaderState != null; - shutdownLeaderState(); - } else if (old == Role.CANDIDATE) { - shutdownElectionDaemon(); - } - - if (old != Role.FOLLOWER) { - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - } - - if (metadataUpdated && sync) { - state.persistMetadata(); - } - return metadataUpdated; - } - - private synchronized void shutdownLeaderState() { - final LeaderState leader = leaderState; - if (leader != null) { - leader.stop(); - } - leaderState = null; - // TODO: make sure that StateMachineUpdater has applied all transactions that have context - } - - private void shutdownElectionDaemon() { - final LeaderElection election = electionDaemon; - if (election != null) { - election.stopRunning(); - // no need to interrupt the election thread - } - electionDaemon = null; - } - - synchronized void changeToLeader() { - Preconditions.checkState(isCandidate()); - shutdownElectionDaemon(); - role = Role.LEADER; - state.becomeLeader(); - // start sending AppendEntries RPC to followers - leaderState = new LeaderState(this, properties); - leaderState.start(); - } - - private void shutdownHeartbeatMonitor() { - final FollowerState hm = heartbeatMonitor; - if (hm != null) { - hm.stopRunning(); - hm.interrupt(); - } - heartbeatMonitor = null; - } - - synchronized void changeToCandidate() { - Preconditions.checkState(isFollower()); - shutdownHeartbeatMonitor(); - role = Role.CANDIDATE; - // start election - electionDaemon = new LeaderElection(this); - electionDaemon.start(); - } - - @Override - public String toString() { - return role + " " + state + " " + lifeCycle.getCurrentState(); - } - - /** - * @return null if the server is in leader state. - */ - CompletableFuture<RaftClientReply> checkLeaderState( - RaftClientRequest request) { - if (!isLeader()) { - NotLeaderException exception = generateNotLeaderException(); - CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); - future.complete(new RaftClientReply(request, exception)); - return future; - } - return null; - } - - NotLeaderException generateNotLeaderException() { - if (lifeCycle.getCurrentState() != RUNNING) { - return new NotLeaderException(getId(), null, null); - } - String leaderId = state.getLeaderId(); - if (leaderId == null || leaderId.equals(state.getSelfId())) { - // No idea about who is the current leader. Or the peer is the current - // leader, but it is about to step down - RaftPeer suggestedLeader = state.getRaftConf() - .getRandomPeer(state.getSelfId()); - leaderId = suggestedLeader == null ? null : suggestedLeader.getId(); - } - RaftConfiguration conf = getRaftConf(); - Collection<RaftPeer> peers = conf.getPeers(); - return new NotLeaderException(getId(), conf.getPeer(leaderId), - peers.toArray(new RaftPeer[peers.size()])); - } - - /** - * Handle a normal update request from client. - */ - public CompletableFuture<RaftClientReply> appendTransaction( - RaftClientRequest request, TransactionContext entry) - throws RaftException { - LOG.debug("{}: receive client request({})", getId(), request); - lifeCycle.assertCurrentState(RUNNING); - CompletableFuture<RaftClientReply> reply; - - final PendingRequest pending; - synchronized (this) { - reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - // append the message to its local log - final long entryIndex; - try { - entryIndex = state.applyLog(entry); - } catch (IOException e) { - throw new RaftException(e); - } - - // put the request into the pending queue - pending = leaderState.addPendingRequest(entryIndex, request, entry); - leaderState.notifySenders(); - } - return pending.getFuture(); - } - - /** - * Handle a raft configuration change request from client. - */ - public CompletableFuture<RaftClientReply> setConfiguration( - SetConfigurationRequest request) throws IOException { - LOG.debug("{}: receive setConfiguration({})", getId(), request); - lifeCycle.assertCurrentState(RUNNING); - CompletableFuture<RaftClientReply> reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - final RaftPeer[] peersInNewConf = request.getPeersInNewConf(); - final PendingRequest pending; - synchronized (this) { - reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - final RaftConfiguration current = getRaftConf(); - // make sure there is no other raft reconfiguration in progress - if (!current.isStable() || leaderState.inStagingState() || - !state.isCurrentConfCommitted()) { - throw new ReconfigurationInProgressException( - "Reconfiguration is already in progress: " + current); - } - - // return true if the new configuration is the same with the current one - if (current.hasNoChange(peersInNewConf)) { - pending = leaderState.returnNoConfChange(request); - return pending.getFuture(); - } - - // add new peers into the rpc service - addPeersToRPC(Arrays.asList(peersInNewConf)); - // add staging state into the leaderState - pending = leaderState.startSetConfiguration(request); - } - return pending.getFuture(); - } - - private boolean shouldWithholdVotes() { - return isLeader() || (isFollower() && state.hasLeader() - && heartbeatMonitor.shouldWithholdVotes()); - } - - /** - * check if the remote peer is not included in the current conf - * and should shutdown. should shutdown if all the following stands: - * 1. this is a leader - * 2. current conf is stable and has been committed - * 3. candidate id is not included in conf - * 4. candidate's last entry's index < conf's index - */ - private boolean shouldSendShutdown(String candidateId, - TermIndex candidateLastEntry) { - return isLeader() - && getRaftConf().isStable() - && getState().isConfCommitted() - && !getRaftConf().containsInConf(candidateId) - && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex() - && !leaderState.isBootStrappingPeer(candidateId); - } - - @Override - public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) - throws IOException { - final String candidateId = r.getServerRequest().getRequestorId(); - return requestVote(candidateId, r.getCandidateTerm(), - ServerProtoUtils.toTermIndex(r.getCandidateLastEntry())); - } - - private RequestVoteReplyProto requestVote(String candidateId, - long candidateTerm, TermIndex candidateLastEntry) throws IOException { - CodeInjectionForTesting.execute(REQUEST_VOTE, getId(), - candidateId, candidateTerm, candidateLastEntry); - LOG.debug("{}: receive requestVote({}, {}, {})", - getId(), candidateId, candidateTerm, candidateLastEntry); - lifeCycle.assertCurrentState(RUNNING); - - boolean voteGranted = false; - boolean shouldShutdown = false; - final RequestVoteReplyProto reply; - synchronized (this) { - if (shouldWithholdVotes()) { - LOG.info("{} Withhold vote from server {} with term {}. " + - "This server:{}, last rpc time from leader {} is {}", getId(), - candidateId, candidateTerm, this, this.getState().getLeaderId(), - (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1)); - } else if (state.recognizeCandidate(candidateId, candidateTerm)) { - boolean termUpdated = changeToFollower(candidateTerm, false); - // see Section 5.4.1 Election restriction - if (state.isLogUpToDate(candidateLastEntry)) { - heartbeatMonitor.updateLastRpcTime(false); - state.grantVote(candidateId); - voteGranted = true; - } - if (termUpdated || voteGranted) { - state.persistMetadata(); // sync metafile - } - } - if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) { - shouldShutdown = true; - } - reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(), - voteGranted, state.getCurrentTerm(), shouldShutdown); - if (LOG.isDebugEnabled()) { - LOG.debug("{} replies to vote request: {}. Peer's state: {}", - getId(), ProtoUtils.toString(reply), state); - } - } - return reply; - } - - private void validateEntries(long expectedTerm, TermIndex previous, - LogEntryProto... entries) { - if (entries != null && entries.length > 0) { - final long index0 = entries[0].getIndex(); - - if (previous == null || previous.getTerm() == 0) { - Preconditions.checkArgument(index0 == 0, - "Unexpected Index: previous is null but entries[%s].getIndex()=%s", - 0, index0); - } else { - Preconditions.checkArgument(previous.getIndex() == index0 - 1, - "Unexpected Index: previous is %s but entries[%s].getIndex()=%s", - previous, 0, index0); - } - - for (int i = 0; i < entries.length; i++) { - final long t = entries[i].getTerm(); - Preconditions.checkArgument(expectedTerm >= t, - "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s", - i, t, expectedTerm); - - final long indexi = entries[i].getIndex(); - Preconditions.checkArgument(indexi == index0 + i, - "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s", - i, indexi, index0); - } - } - } - - @Override - public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) - throws IOException { - // TODO avoid converting list to array - final LogEntryProto[] entries = r.getEntriesList() - .toArray(new LogEntryProto[r.getEntriesCount()]); - final TermIndex previous = r.hasPreviousLog() ? - ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; - return appendEntries(r.getServerRequest().getRequestorId(), - r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(), - entries); - } - - private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm, - TermIndex previous, long leaderCommit, boolean initializing, - LogEntryProto... entries) throws IOException { - CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, entries); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, - ServerProtoUtils.toString(entries)); - } - lifeCycle.assertCurrentState(STARTING, RUNNING); - - try { - validateEntries(leaderTerm, previous, entries); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } - - final long currentTerm; - long nextIndex = state.getLog().getNextIndex(); - synchronized (this) { - final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); - currentTerm = state.getCurrentTerm(); - if (!recognized) { - final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), currentTerm, nextIndex, NOT_LEADER); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: do not recognize leader. Reply: {}", - getId(), ProtoUtils.toString(reply)); - } - return reply; - } - changeToFollower(leaderTerm, true); - state.setLeader(leaderId); - - if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - } - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(true); - } - - // We need to check if "previous" is in the local peer. Note that it is - // possible that "previous" is covered by the latest snapshot: e.g., - // it's possible there's no log entries outside of the latest snapshot. - // However, it is not possible that "previous" index is smaller than the - // last index included in snapshot. This is because indices <= snapshot's - // last index should have been committed. - if (previous != null && !containPrevious(previous)) { - final AppendEntriesReplyProto reply = - ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), - currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY); - LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", - getId(), previous, ServerProtoUtils.toString(reply)); - return reply; - } - - state.getLog().append(entries); - state.updateConfiguration(entries); - state.updateStatemachine(leaderCommit, currentTerm); - } - if (entries != null && entries.length > 0) { - try { - state.getLog().logSync(); - } catch (InterruptedException e) { - throw new InterruptedIOException("logSync got interrupted"); - } - nextIndex = entries[entries.length - 1].getIndex() + 1; - } - synchronized (this) { - if (lifeCycle.getCurrentState() == RUNNING && isFollower() - && getState().getCurrentTerm() == currentTerm) { - // reset election timer to avoid punishing the leader for our own - // long disk writes - heartbeatMonitor.updateLastRpcTime(false); - } - } - final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), currentTerm, nextIndex, SUCCESS); - LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(), - ServerProtoUtils.toString(reply)); - return reply; - } - - private boolean containPrevious(TermIndex previous) { - LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}", - getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot()); - return state.getLog().contains(previous) - || (state.getLatestSnapshot() != null - && state.getLatestSnapshot().getTermIndex().equals(previous)) - || (state.getLatestInstalledSnapshot() != null) - && state.getLatestInstalledSnapshot().equals(previous); - } - - @Override - public InstallSnapshotReplyProto installSnapshot( - InstallSnapshotRequestProto request) throws IOException { - final String leaderId = request.getServerRequest().getRequestorId(); - CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request); - LOG.debug("{}: receive installSnapshot({})", getId(), request); - - lifeCycle.assertCurrentState(STARTING, RUNNING); - - final long currentTerm; - final long leaderTerm = request.getLeaderTerm(); - final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex( - request.getTermIndex()); - final long lastIncludedIndex = lastTermIndex.getIndex(); - synchronized (this) { - final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); - currentTerm = state.getCurrentTerm(); - if (!recognized) { - final InstallSnapshotReplyProto reply = ServerProtoUtils - .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm, - request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); - LOG.debug("{}: do not recognize leader for installing snapshot." + - " Reply: {}", getId(), reply); - return reply; - } - changeToFollower(leaderTerm, true); - state.setLeader(leaderId); - - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(true); - } - - // Check and append the snapshot chunk. We simply put this in lock - // considering a follower peer requiring a snapshot installation does not - // have a lot of requests - Preconditions.checkState( - state.getLog().getNextIndex() <= lastIncludedIndex, - "%s log's next id is %s, last included index in snapshot is %s", - getId(), state.getLog().getNextIndex(), lastIncludedIndex); - - //TODO: We should only update State with installed snapshot once the request is done. - state.installSnapshot(request); - - // update the committed index - // re-load the state machine if this is the last chunk - if (request.getDone()) { - state.reloadStateMachine(lastIncludedIndex, leaderTerm); - } - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(false); - } - } - if (request.getDone()) { - LOG.info("{}: successfully install the whole snapshot-{}", getId(), - lastIncludedIndex); - } - return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), - currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS); - } - - AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm, - String targetId, TermIndex previous, List<LogEntryProto> entries, - boolean initializing) { - return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, - leaderTerm, entries, state.getLog().getLastCommittedIndex(), - initializing, previous); - } - - synchronized InstallSnapshotRequestProto createInstallSnapshotRequest( - String targetId, String requestId, int requestIndex, SnapshotInfo snapshot, - List<FileChunkProto> chunks, boolean done) { - OptionalLong totalSize = snapshot.getFiles().stream() - .mapToLong(FileInfo::getFileSize).reduce(Long::sum); - assert totalSize.isPresent(); - return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId, - requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(), - chunks, totalSize.getAsLong(), done); - } - - synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId, - long term, TermIndex lastEntry) { - return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term, - lastEntry); - } - - public synchronized void submitLocalSyncEvent() { - if (isLeader() && leaderState != null) { - leaderState.submitUpdateStateEvent(UPDATE_COMMIT_EVENT); - } - } - - public void addPeersToRPC(Iterable<RaftPeer> peers) { - serverRpc.addPeers(peers); - } - - synchronized void replyPendingRequest(long logIndex, - CompletableFuture<Message> message) { - if (isLeader() && leaderState != null) { // is leader and is running - leaderState.replyPendingRequest(logIndex, message); - } - } - - TransactionContext getTransactionContext(long index) { - if (leaderState != null) { // is leader and is running - return leaderState.getTransactionContext(index); - } - return null; - } - - public RaftProperties getProperties() { - return this.properties; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java index 837b53b..2ce0326 100644 --- a/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java +++ b/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java @@ -17,6 +17,7 @@ */ package org.apache.raft.server; +import org.apache.raft.server.impl.LogAppenderFactory; import org.apache.raft.util.NetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java deleted file mode 100644 index f6781f3..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import org.apache.raft.client.RaftClient; - -public interface RaftServerConstants { - long INVALID_LOG_INDEX = -1; - byte LOG_TERMINATE_BYTE = 0; - long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM; - - enum StartupOption { - FORMAT("format"), - REGULAR("regular"); - - private final String option; - - StartupOption(String arg) { - this.option = arg; - } - - public static StartupOption getOption(String arg) { - for (StartupOption s : StartupOption.values()) { - if (s.option.equals(arg)) { - return s; - } - } - return REGULAR; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java deleted file mode 100644 index de81ec2..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.shaded.proto.RaftProtos.*; - -import java.io.IOException; -import java.net.InetSocketAddress; - -public interface RaftServerRpc { - void start(); - - void shutdown(); - - InetSocketAddress getInetSocketAddress(); - - AppendEntriesReplyProto sendAppendEntries( - AppendEntriesRequestProto request) throws IOException; - - InstallSnapshotReplyProto sendInstallSnapshot( - InstallSnapshotRequestProto request) throws IOException; - - RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request) - throws IOException; - - /** add information of the given peers */ - void addPeers(Iterable<RaftPeer> peers); -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java b/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java deleted file mode 100644 index e281bfa..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import org.apache.raft.protocol.*; -import org.apache.raft.server.protocol.RaftServerProtocol; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -/** - * Each RPC request is first handled by the RequestDispatcher: - * 1. A request from another RaftPeer is to be handled by RaftServer. - * - * If the raft peer is the leader, then: - * - * 2. A read-only request from client is to be handled by the state machine. - * 3. A write request from client is first validated by the state machine. The - * state machine returns the content of the raft log entry, which is then passed - * to the RaftServer for replication. - */ -public class RequestDispatcher implements RaftClientProtocol, RaftServerProtocol { - static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class); - - private final RaftServer server; - private final StateMachine stateMachine; - - public RequestDispatcher(RaftServer server) { - this.server = server; - this.stateMachine = server.getStateMachine(); - } - - public CompletableFuture<RaftClientReply> handleClientRequest( - RaftClientRequest request) throws IOException { - // first check the server's leader state - CompletableFuture<RaftClientReply> reply = server.checkLeaderState(request); - if (reply != null) { - return reply; - } - - // let the state machine handle read-only request from client - if (request.isReadOnly()) { - // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper, - // section 8 (last part) - return stateMachine.query(request); - } - - // TODO: this client request will not be added to pending requests - // until later which means that any failure in between will leave partial state in the - // state machine. We should call cancelTransaction() for failed requests - TransactionContext entry = stateMachine.startTransaction(request); - if (entry.getException().isPresent()) { - throw RaftUtils.asIOException(entry.getException().get()); - } - - return server.appendTransaction(request, entry); - } - - @Override - public RaftClientReply submitClientRequest(RaftClientRequest request) - throws IOException { - return waitForReply(server.getId(), request, handleClientRequest(request)); - } - - public CompletableFuture<RaftClientReply> setConfigurationAsync( - SetConfigurationRequest request) throws IOException { - return server.setConfiguration(request); - } - - @Override - public RaftClientReply setConfiguration(SetConfigurationRequest request) - throws IOException { - return waitForReply(server.getId(), request, setConfigurationAsync(request)); - } - - private static RaftClientReply waitForReply(String serverId, - RaftClientRequest request, CompletableFuture<RaftClientReply> future) - throws IOException { - try { - return future.get(); - } catch (InterruptedException e) { - final String s = serverId + ": Interrupted when waiting for reply, request=" + request; - LOG.info(s, e); - throw RaftUtils.toInterruptedIOException(s, e); - } catch (ExecutionException e) { - final Throwable cause = e.getCause(); - if (cause == null) { - throw new IOException(e); - } - if (cause instanceof NotLeaderException) { - return new RaftClientReply(request, (NotLeaderException)cause); - } else { - throw RaftUtils.asIOException(cause); - } - } - } - - @Override - public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) - throws IOException { - return server.requestVote(request); - } - - @Override - public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) - throws IOException { - return server.appendEntries(request); - } - - @Override - public InstallSnapshotReplyProto installSnapshot( - InstallSnapshotRequestProto request) throws IOException { - return server.installSnapshot(request); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/Role.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/Role.java b/raft-server/src/main/java/org/apache/raft/server/Role.java deleted file mode 100644 index a7e2f4c..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/Role.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -/** - * Role of Raft peer - */ -public enum Role { - LEADER, CANDIDATE, FOLLOWER -}
