Repository: incubator-ratis Updated Branches: refs/heads/master f6814c6b4 -> bbfb8754d
RATIS-336. LeaderState.isBootStrappingPeer may have NPE. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/bbfb8754 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/bbfb8754 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/bbfb8754 Branch: refs/heads/master Commit: bbfb8754d136a5404e8c2a813a7468d94165d80c Parents: f6814c6 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Oct 5 14:31:53 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Oct 5 14:31:53 2018 +0800 ---------------------------------------------------------------------- .../ratis/server/impl/LeaderElection.java | 2 +- .../apache/ratis/server/impl/LeaderState.java | 173 ++++++++++++------- .../apache/ratis/server/impl/LogAppender.java | 15 +- .../ratis/server/impl/RaftServerImpl.java | 27 ++- .../ratis/server/storage/RaftLogWorker.java | 18 +- .../ratis/server/storage/SegmentedRaftLog.java | 3 +- 6 files changed, 131 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index c60352d..d62b1a7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -170,7 +170,7 @@ class LeaderElection extends Daemon { case DISCOVERED_A_NEW_TERM: final long term = r.term > server.getState().getCurrentTerm() ? r.term : server.getState().getCurrentTerm(); - server.changeToFollower(term, true); + server.changeToFollowerAndPersistMetadata(term); return; case TIMEOUT: // should start another election http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index be54346..b4b613e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -40,8 +40,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.*; - /** * States for leader only. It contains three different types of processors: * 1. RPC senders: each thread is appending log to a follower @@ -54,21 +52,76 @@ public class LeaderState { private static final Logger LOG = RaftServerImpl.LOG; public static final String APPEND_PLACEHOLDER = LeaderState.class.getSimpleName() + ".placeholder"; - enum StateUpdateEventType { - STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS - } - - enum BootStrapProgress { + private enum BootStrapProgress { NOPROGRESS, PROGRESSING, CAUGHTUP } static class StateUpdateEvent { - final StateUpdateEventType type; + private enum Type { + STEP_DOWN, UPDATE_COMMIT, CHECK_STAGING + } + + final Type type; final long newTerm; + final Runnable handler; - StateUpdateEvent(StateUpdateEventType type, long newTerm) { + StateUpdateEvent(Type type, long newTerm, Runnable handler) { this.type = type; this.newTerm = newTerm; + this.handler = handler; + } + + void execute() { + handler.run(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (!(obj instanceof StateUpdateEvent)) { + return false; + } + final StateUpdateEvent that = (StateUpdateEvent)obj; + return this.type == that.type && this.newTerm == that.newTerm; + } + + @Override + public String toString() { + return type + (newTerm >= 0? ":" + newTerm: ""); + } + } + + private class EventQueue { + private final BlockingQueue<StateUpdateEvent> queue = new ArrayBlockingQueue<>(4096); + + void submit(StateUpdateEvent event) { + try { + queue.put(event); + } catch (InterruptedException e) { + LOG.info("{}: Interrupted when submitting {} ", server.getId(), event); + } + } + + StateUpdateEvent poll() { + final StateUpdateEvent e; + try { + e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS); + } catch(InterruptedException ie) { + String s = server.getId() + ": " + getClass().getSimpleName() + " thread is interrupted"; + if (!running) { + LOG.info(s + " gracefully"); + return null; + } else { + throw new IllegalStateException(s + " UNEXPECTEDLY", ie); + } + } + + if (e != null) { + // remove duplicated events from the head. + for(; e.equals(queue.peek()); queue.poll()); + } + return e; } } @@ -101,10 +154,10 @@ public class LeaderState { } } - static final StateUpdateEvent UPDATE_COMMIT_EVENT = - new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1); - static final StateUpdateEvent STAGING_PROGRESS_EVENT = - new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1); + private final StateUpdateEvent UPDATE_COMMIT_EVENT = + new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit); + private final StateUpdateEvent CHECK_STAGING_EVENT = + new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, this::checkStaging); private final RaftServerImpl server; private final RaftLog raftLog; @@ -117,7 +170,7 @@ public class LeaderState { * The list is protected by the RaftServer's lock. */ private final SenderList senders; - private final BlockingQueue<StateUpdateEvent> eventQ; + private final EventQueue eventQueue = new EventQueue(); private final EventProcessor processor; private final PendingRequests pendingRequests; private volatile boolean running = true; @@ -135,7 +188,6 @@ public class LeaderState { final ServerState state = server.getState(); this.raftLog = state.getLog(); this.currentTerm = state.getCurrentTerm(); - eventQ = new ArrayBlockingQueue<>(4096); processor = new EventProcessor(); pendingRequests = new PendingRequests(server); @@ -192,10 +244,6 @@ public class LeaderState { return stagingState != null; } - ConfigurationStagingState getStagingState() { - return stagingState; - } - long getCurrentTerm() { return currentTerm; } @@ -299,11 +347,25 @@ public class LeaderState { stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId())); } - void submitUpdateStateEvent(StateUpdateEvent event) { + void submitStepDownEvent() { + submitStepDownEvent(getCurrentTerm()); + } + + void submitStepDownEvent(long term) { + eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, term, () -> stepDown(term))); + } + + private void stepDown(long term) { try { - eventQ.put(event); - } catch (InterruptedException e) { - LOG.info("Interrupted when adding event {} into the queue", event); + server.changeToFollowerAndPersistMetadata(term); + } catch(IOException e) { + final String s = server.getId() + ": Failed to persist metadata for term " + term; + LOG.warn(s, e); + // the failure should happen while changing the state to follower + // thus the in-memory state should have been updated + if (running) { + throw new IllegalStateException(s + " and running == true", e); + } } } @@ -331,50 +393,20 @@ public class LeaderState { prepare(); while (running) { - try { - StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(), - TimeUnit.MILLISECONDS); - synchronized (server) { - if (running) { - handleEvent(event); + final StateUpdateEvent event = eventQueue.poll(); + synchronized(server) { + if (running) { + if (event != null) { + event.execute(); + } else if (inStagingState()) { + checkStaging(); } } - // the updated configuration does not need to be sync'ed here - } catch (InterruptedException e) { - final String s = server.getId() + " " + getClass().getSimpleName() - + " thread is interrupted "; - if (!running) { - LOG.info(s + " gracefully; server=" + server); - } else { - LOG.warn(s + " UNEXPECTEDLY; server=" + server, e); - throw new RuntimeException(e); - } - } catch (IOException e) { - LOG.warn("Failed to persist new votedFor/term.", e); - // the failure should happen while changing the state to follower - // thus the in-memory state should have been updated - Preconditions.assertTrue(!running); } } } } - private void handleEvent(StateUpdateEvent e) throws IOException { - if (e == null) { - if (inStagingState()) { - checkNewPeers(); - } - } else { - if (e.type == STEPDOWN) { - server.changeToFollower(e.newTerm, true); - } else if (e.type == UPDATECOMMIT) { - updateLastCommitted(); - } else if (e.type == STAGINGPROGRESS) { - checkNewPeers(); - } - } - } - /** * So far we use a simple implementation for catchup checking: * 1. If the latest rpc time of the remote peer is before 3 * max_timeout, @@ -410,11 +442,14 @@ public class LeaderState { .collect(Collectors.toCollection(ArrayList::new)); } - private void checkNewPeers() { + void submitCheckStagingEvent() { + eventQueue.submit(CHECK_STAGING_EVENT); + } + + private void checkStaging() { if (!inStagingState()) { - // it is possible that the bootstrapping is done and we still have - // remaining STAGINGPROGRESS event to handle. - updateLastCommitted(); + // it is possible that the bootstrapping is done. Then, fallback to UPDATE_COMMIT + UPDATE_COMMIT_EVENT.execute(); } else { final long committedIndex = server.getState().getLog() .getLastCommittedIndex(); @@ -431,10 +466,14 @@ public class LeaderState { } boolean isBootStrappingPeer(RaftPeerId peerId) { - return inStagingState() && getStagingState().contains(peerId); + return Optional.ofNullable(stagingState).map(s -> s.contains(peerId)).orElse(false); + } + + void submitUpdateCommitEvent() { + eventQueue.submit(UPDATE_COMMIT_EVENT); } - private void updateLastCommitted() { + private void updateCommit() { final RaftPeerId selfId = server.getId(); final RaftConfiguration conf = server.getRaftConf(); @@ -575,7 +614,7 @@ public class LeaderState { /** @return true if the request is replied; otherwise, the reply is delayed, return false. */ boolean replyPendingRequest(long logIndex, RaftClientReply reply, RetryCache.CacheEntry cacheEntry) { if (!pendingRequests.replyPendingRequest(logIndex, reply, cacheEntry)) { - submitUpdateStateEvent(UPDATE_COMMIT_EVENT); + submitUpdateCommitEvent(); return false; } return true; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index c237917..3c9b2d4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -20,7 +20,6 @@ package org.apache.ratis.server.impl; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLog.EntryWithData; import org.apache.ratis.server.storage.FileInfo; @@ -45,7 +44,6 @@ import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX import static org.apache.ratis.util.LifeCycle.State.CLOSED; import static org.apache.ratis.util.LifeCycle.State.CLOSING; import static org.apache.ratis.util.LifeCycle.State.EXCEPTION; -import static org.apache.ratis.util.LifeCycle.State.NEW; import static org.apache.ratis.util.LifeCycle.State.RUNNING; import static org.apache.ratis.util.LifeCycle.State.STARTING; @@ -491,10 +489,11 @@ public class LogAppender { } protected void submitEventOnSuccessAppend() { - LeaderState.StateUpdateEvent e = follower.isAttendingVote() ? - LeaderState.UPDATE_COMMIT_EVENT : - LeaderState.STAGING_PROGRESS_EVENT; - leaderState.submitUpdateStateEvent(e); + if (follower.isAttendingVote()) { + leaderState.submitUpdateCommitEvent(); + } else { + leaderState.submitCheckStagingEvent(); + } } protected void checkSlowness() { @@ -531,9 +530,7 @@ public class LogAppender { synchronized (server) { if (isAppenderRunning() && follower.isAttendingVote() && responseTerm > leaderState.getCurrentTerm()) { - leaderState.submitUpdateStateEvent( - new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN, - responseTerm)); + leaderState.submitStepDownEvent(responseTerm); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 7fde782..d4b32a1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -295,13 +295,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou /** * Change the server state to Follower if necessary * @param newTerm The new term. - * @param sync We will call {@link ServerState#persistMetadata()} if this is - * set to true and term/votedFor get updated. * @return if the term/votedFor should be updated to the new term * @throws IOException if term/votedFor persistence failed. */ - synchronized boolean changeToFollower(long newTerm, boolean sync) - throws IOException { + private synchronized boolean changeToFollower(long newTerm) { final RaftPeerRole old = role.getCurrentRole(); final boolean metadataUpdated = state.updateCurrentTerm(newTerm); @@ -314,11 +311,13 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } startHeartbeatMonitor(); } + return metadataUpdated; + } - if (metadataUpdated && sync) { + synchronized void changeToFollowerAndPersistMetadata(long newTerm) throws IOException { + if (changeToFollower(newTerm)) { state.persistMetadata(); } - return metadataUpdated; } private synchronized void shutdownLeaderState(boolean allowNull) { @@ -546,9 +545,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou cacheEntry.failWithReply(exceptionReply); // leader will step down here if (isLeader() && leaderState != null) { - leaderState.submitUpdateStateEvent(new LeaderState.StateUpdateEvent( - LeaderState.StateUpdateEventType.STEPDOWN, - leaderState.getCurrentTerm())); + leaderState.submitStepDownEvent(); } return CompletableFuture.completedFuture(exceptionReply); } @@ -777,7 +774,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou getId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(), isFollower()? heartbeatMonitor.getLastRpcTime().elapsedTimeMs() + "ms": null); } else if (state.recognizeCandidate(candidateId, candidateTerm)) { - boolean termUpdated = changeToFollower(candidateTerm, false); + final boolean termUpdated = changeToFollower(candidateTerm); // see Section 5.4.1 Election restriction if (state.isLogUpToDate(candidateLastEntry)) { heartbeatMonitor.updateLastRpcTime(false); @@ -910,7 +907,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } return CompletableFuture.completedFuture(reply); } - changeToFollower(leaderTerm, true); + changeToFollowerAndPersistMetadata(leaderTerm); state.setLeader(leaderId, "appendEntries"); if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { @@ -1010,7 +1007,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou " Reply: {}", getId(), reply); return reply; } - changeToFollower(leaderTerm, true); + changeToFollowerAndPersistMetadata(leaderTerm); state.setLeader(leaderId, "installSnapshot"); if (lifeCycle.getCurrentState() == RUNNING) { @@ -1062,10 +1059,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou groupId, term, lastEntry); } - public synchronized void submitLocalSyncEvent() { - if (isLeader() && leaderState != null) { - leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT); - } + public void submitUpdateCommitEvent() { + Optional.ofNullable(leaderState).ifPresent(LeaderState::submitUpdateCommitEvent); } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index c0d1cb9..715370b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -61,7 +61,7 @@ class RaftLogWorker implements Runnable { private final RaftStorage storage; private volatile LogOutputStream out; - private final RaftServerImpl raftServer; + private final Runnable submitUpdateCommitEvent; private final StateMachine stateMachine; private final Supplier<Timer> logFlushTimer; @@ -86,7 +86,7 @@ class RaftLogWorker implements Runnable { this.name = selfId + "-" + getClass().getSimpleName(); LOG.info("new {} for {}", name, storage); - this.raftServer = raftServer; + this.submitUpdateCommitEvent = raftServer != null? raftServer::submitUpdateCommitEvent: () -> {}; this.stateMachine = raftServer != null? raftServer.getStateMachine(): null; this.storage = storage; @@ -100,11 +100,8 @@ class RaftLogWorker implements Runnable { this.workerThread = new Thread(this, name); // Server Id can be null in unit tests - Supplier<String> serverId = () -> raftServer == null || raftServer.getId() == null - ? "null" : raftServer.getId().toString(); this.logFlushTimer = JavaUtils.memoize(() -> RatisMetricsRegistry.getRegistry() - .timer(MetricRegistry.name(RaftLogWorker.class, serverId.get(), - "flush-time"))); + .timer(MetricRegistry.name(RaftLogWorker.class, selfId.toString(), "flush-time"))); } void start(long latestIndex, File openSegmentFile) throws IOException { @@ -243,9 +240,7 @@ class RaftLogWorker implements Runnable { private void updateFlushedIndex() { flushedIndex = lastWrittenIndex; pendingFlushNum = 0; - if (raftServer != null) { - raftServer.submitLocalSyncEvent(); - } + submitUpdateCommitEvent.run(); } /** @@ -288,9 +283,8 @@ class RaftLogWorker implements Runnable { // this.entry != entry iff the entry has state machine data this.stateMachineFuture = stateMachine.writeStateMachineData(entry); } catch (Throwable e) { - LOG.error("{}: writeStateMachineData failed for index:{} proto:{}", - raftServer.getId() ,entry.getIndex(), - ServerProtoUtils.toString(entry), e.getMessage()); + LOG.error(name + ": writeStateMachineData failed for index " + entry.getIndex() + + ", entry=" + ServerProtoUtils.toLogEntryString(entry), e); throw e; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 4d59bf3..862e21f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -105,8 +105,7 @@ public class SegmentedRaftLog extends RaftLog { private final long segmentMaxSize; public SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server, - RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) - throws IOException { + RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) { super(selfId, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties) .getSizeInt()); this.server = server;
