Repository: incubator-ratis
Updated Branches:
  refs/heads/master f6814c6b4 -> bbfb8754d


RATIS-336. LeaderState.isBootStrappingPeer may have NPE.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/bbfb8754
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/bbfb8754
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/bbfb8754

Branch: refs/heads/master
Commit: bbfb8754d136a5404e8c2a813a7468d94165d80c
Parents: f6814c6
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Fri Oct 5 14:31:53 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Fri Oct 5 14:31:53 2018 +0800

----------------------------------------------------------------------
 .../ratis/server/impl/LeaderElection.java       |   2 +-
 .../apache/ratis/server/impl/LeaderState.java   | 173 ++++++++++++-------
 .../apache/ratis/server/impl/LogAppender.java   |  15 +-
 .../ratis/server/impl/RaftServerImpl.java       |  27 ++-
 .../ratis/server/storage/RaftLogWorker.java     |  18 +-
 .../ratis/server/storage/SegmentedRaftLog.java  |   3 +-
 6 files changed, 131 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index c60352d..d62b1a7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -170,7 +170,7 @@ class LeaderElection extends Daemon {
           case DISCOVERED_A_NEW_TERM:
             final long term = r.term > server.getState().getCurrentTerm() ?
                 r.term : server.getState().getCurrentTerm();
-            server.changeToFollower(term, true);
+            server.changeToFollowerAndPersistMetadata(term);
             return;
           case TIMEOUT:
             // should start another election

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index be54346..b4b613e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -40,8 +40,6 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.*;
-
 /**
  * States for leader only. It contains three different types of processors:
  * 1. RPC senders: each thread is appending log to a follower
@@ -54,21 +52,76 @@ public class LeaderState {
   private static final Logger LOG = RaftServerImpl.LOG;
   public static final String APPEND_PLACEHOLDER = 
LeaderState.class.getSimpleName() + ".placeholder";
 
-  enum StateUpdateEventType {
-    STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS
-  }
-
-  enum BootStrapProgress {
+  private enum BootStrapProgress {
     NOPROGRESS, PROGRESSING, CAUGHTUP
   }
 
   static class StateUpdateEvent {
-    final StateUpdateEventType type;
+    private enum Type {
+      STEP_DOWN, UPDATE_COMMIT, CHECK_STAGING
+    }
+
+    final Type type;
     final long newTerm;
+    final Runnable handler;
 
-    StateUpdateEvent(StateUpdateEventType type, long newTerm) {
+    StateUpdateEvent(Type type, long newTerm, Runnable handler) {
       this.type = type;
       this.newTerm = newTerm;
+      this.handler = handler;
+    }
+
+    void execute() {
+      handler.run();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      } else if (!(obj instanceof StateUpdateEvent)) {
+        return false;
+      }
+      final StateUpdateEvent that = (StateUpdateEvent)obj;
+      return this.type == that.type && this.newTerm == that.newTerm;
+    }
+
+    @Override
+    public String toString() {
+      return type + (newTerm >= 0? ":" + newTerm: "");
+    }
+  }
+
+  private class EventQueue {
+    private final BlockingQueue<StateUpdateEvent> queue = new 
ArrayBlockingQueue<>(4096);
+
+    void submit(StateUpdateEvent event) {
+      try {
+        queue.put(event);
+      } catch (InterruptedException e) {
+        LOG.info("{}: Interrupted when submitting {} ", server.getId(), event);
+      }
+    }
+
+    StateUpdateEvent poll() {
+      final StateUpdateEvent e;
+      try {
+        e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
+      } catch(InterruptedException ie) {
+        String s = server.getId() + ": " + getClass().getSimpleName() + " 
thread is interrupted";
+        if (!running) {
+          LOG.info(s + " gracefully");
+          return null;
+        } else {
+          throw new IllegalStateException(s + " UNEXPECTEDLY", ie);
+        }
+      }
+
+      if (e != null) {
+        // remove duplicated events from the head.
+        for(; e.equals(queue.peek()); queue.poll());
+      }
+      return e;
     }
   }
 
@@ -101,10 +154,10 @@ public class LeaderState {
     }
   }
 
-  static final StateUpdateEvent UPDATE_COMMIT_EVENT =
-      new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1);
-  static final StateUpdateEvent STAGING_PROGRESS_EVENT =
-      new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1);
+  private final StateUpdateEvent UPDATE_COMMIT_EVENT =
+      new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, 
this::updateCommit);
+  private final StateUpdateEvent CHECK_STAGING_EVENT =
+      new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, 
this::checkStaging);
 
   private final RaftServerImpl server;
   private final RaftLog raftLog;
@@ -117,7 +170,7 @@ public class LeaderState {
    * The list is protected by the RaftServer's lock.
    */
   private final SenderList senders;
-  private final BlockingQueue<StateUpdateEvent> eventQ;
+  private final EventQueue eventQueue = new EventQueue();
   private final EventProcessor processor;
   private final PendingRequests pendingRequests;
   private volatile boolean running = true;
@@ -135,7 +188,6 @@ public class LeaderState {
     final ServerState state = server.getState();
     this.raftLog = state.getLog();
     this.currentTerm = state.getCurrentTerm();
-    eventQ = new ArrayBlockingQueue<>(4096);
     processor = new EventProcessor();
     pendingRequests = new PendingRequests(server);
 
@@ -192,10 +244,6 @@ public class LeaderState {
     return stagingState != null;
   }
 
-  ConfigurationStagingState getStagingState() {
-    return stagingState;
-  }
-
   long getCurrentTerm() {
     return currentTerm;
   }
@@ -299,11 +347,25 @@ public class LeaderState {
     stopAndRemoveSenders(s -> 
!conf.containsInConf(s.getFollower().getPeer().getId()));
   }
 
-  void submitUpdateStateEvent(StateUpdateEvent event) {
+  void submitStepDownEvent() {
+    submitStepDownEvent(getCurrentTerm());
+  }
+
+  void submitStepDownEvent(long term) {
+    eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, 
term, () -> stepDown(term)));
+  }
+
+  private void stepDown(long term) {
     try {
-      eventQ.put(event);
-    } catch (InterruptedException e) {
-      LOG.info("Interrupted when adding event {} into the queue", event);
+      server.changeToFollowerAndPersistMetadata(term);
+    } catch(IOException e) {
+      final String s = server.getId() + ": Failed to persist metadata for term 
" + term;
+      LOG.warn(s, e);
+      // the failure should happen while changing the state to follower
+      // thus the in-memory state should have been updated
+      if (running) {
+        throw new IllegalStateException(s + " and running == true", e);
+      }
     }
   }
 
@@ -331,50 +393,20 @@ public class LeaderState {
       prepare();
 
       while (running) {
-        try {
-          StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(),
-              TimeUnit.MILLISECONDS);
-          synchronized (server) {
-            if (running) {
-              handleEvent(event);
+        final StateUpdateEvent event = eventQueue.poll();
+        synchronized(server) {
+          if (running) {
+            if (event != null) {
+              event.execute();
+            } else if (inStagingState()) {
+              checkStaging();
             }
           }
-          // the updated configuration does not need to be sync'ed here
-        } catch (InterruptedException e) {
-          final String s = server.getId() + " " + getClass().getSimpleName()
-              + " thread is interrupted ";
-          if (!running) {
-            LOG.info(s + " gracefully; server=" + server);
-          } else {
-            LOG.warn(s + " UNEXPECTEDLY; server=" + server, e);
-            throw new RuntimeException(e);
-          }
-        } catch (IOException e) {
-          LOG.warn("Failed to persist new votedFor/term.", e);
-          // the failure should happen while changing the state to follower
-          // thus the in-memory state should have been updated
-          Preconditions.assertTrue(!running);
         }
       }
     }
   }
 
-  private void handleEvent(StateUpdateEvent e) throws IOException {
-    if (e == null) {
-      if (inStagingState()) {
-        checkNewPeers();
-      }
-    } else {
-      if (e.type == STEPDOWN) {
-        server.changeToFollower(e.newTerm, true);
-      } else if (e.type == UPDATECOMMIT) {
-        updateLastCommitted();
-      } else if (e.type == STAGINGPROGRESS) {
-        checkNewPeers();
-      }
-    }
-  }
-
   /**
    * So far we use a simple implementation for catchup checking:
    * 1. If the latest rpc time of the remote peer is before 3 * max_timeout,
@@ -410,11 +442,14 @@ public class LeaderState {
         .collect(Collectors.toCollection(ArrayList::new));
   }
 
-  private void checkNewPeers() {
+  void submitCheckStagingEvent() {
+    eventQueue.submit(CHECK_STAGING_EVENT);
+  }
+
+  private void checkStaging() {
     if (!inStagingState()) {
-      // it is possible that the bootstrapping is done and we still have
-      // remaining STAGINGPROGRESS event to handle.
-      updateLastCommitted();
+      // it is possible that the bootstrapping is done. Then, fallback to 
UPDATE_COMMIT
+      UPDATE_COMMIT_EVENT.execute();
     } else {
       final long committedIndex = server.getState().getLog()
           .getLastCommittedIndex();
@@ -431,10 +466,14 @@ public class LeaderState {
   }
 
   boolean isBootStrappingPeer(RaftPeerId peerId) {
-    return inStagingState() && getStagingState().contains(peerId);
+    return Optional.ofNullable(stagingState).map(s -> 
s.contains(peerId)).orElse(false);
+  }
+
+  void submitUpdateCommitEvent() {
+    eventQueue.submit(UPDATE_COMMIT_EVENT);
   }
 
-  private void updateLastCommitted() {
+  private void updateCommit() {
     final RaftPeerId selfId = server.getId();
     final RaftConfiguration conf = server.getRaftConf();
 
@@ -575,7 +614,7 @@ public class LeaderState {
   /** @return true if the request is replied; otherwise, the reply is delayed, 
return false. */
   boolean replyPendingRequest(long logIndex, RaftClientReply reply, 
RetryCache.CacheEntry cacheEntry) {
     if (!pendingRequests.replyPendingRequest(logIndex, reply, cacheEntry)) {
-      submitUpdateStateEvent(UPDATE_COMMIT_EVENT);
+      submitUpdateCommitEvent();
       return false;
     }
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index c237917..3c9b2d4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -20,7 +20,6 @@ package org.apache.ratis.server.impl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLog.EntryWithData;
 import org.apache.ratis.server.storage.FileInfo;
@@ -45,7 +44,6 @@ import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX
 import static org.apache.ratis.util.LifeCycle.State.CLOSED;
 import static org.apache.ratis.util.LifeCycle.State.CLOSING;
 import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
-import static org.apache.ratis.util.LifeCycle.State.NEW;
 import static org.apache.ratis.util.LifeCycle.State.RUNNING;
 import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
@@ -491,10 +489,11 @@ public class LogAppender {
   }
 
   protected void submitEventOnSuccessAppend() {
-    LeaderState.StateUpdateEvent e = follower.isAttendingVote() ?
-        LeaderState.UPDATE_COMMIT_EVENT :
-        LeaderState.STAGING_PROGRESS_EVENT;
-    leaderState.submitUpdateStateEvent(e);
+    if (follower.isAttendingVote()) {
+      leaderState.submitUpdateCommitEvent();
+    } else {
+      leaderState.submitCheckStagingEvent();
+    }
   }
 
   protected void checkSlowness() {
@@ -531,9 +530,7 @@ public class LogAppender {
     synchronized (server) {
       if (isAppenderRunning() && follower.isAttendingVote()
           && responseTerm > leaderState.getCurrentTerm()) {
-        leaderState.submitUpdateStateEvent(
-            new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN,
-                responseTerm));
+        leaderState.submitStepDownEvent(responseTerm);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 7fde782..d4b32a1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -295,13 +295,10 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   /**
    * Change the server state to Follower if necessary
    * @param newTerm The new term.
-   * @param sync We will call {@link ServerState#persistMetadata()} if this is
-   *             set to true and term/votedFor get updated.
    * @return if the term/votedFor should be updated to the new term
    * @throws IOException if term/votedFor persistence failed.
    */
-  synchronized boolean changeToFollower(long newTerm, boolean sync)
-      throws IOException {
+  private synchronized boolean changeToFollower(long newTerm) {
     final RaftPeerRole old = role.getCurrentRole();
     final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
 
@@ -314,11 +311,13 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       }
       startHeartbeatMonitor();
     }
+    return metadataUpdated;
+  }
 
-    if (metadataUpdated && sync) {
+  synchronized void changeToFollowerAndPersistMetadata(long newTerm) throws 
IOException {
+    if (changeToFollower(newTerm)) {
       state.persistMetadata();
     }
-    return metadataUpdated;
   }
 
   private synchronized void shutdownLeaderState(boolean allowNull) {
@@ -546,9 +545,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
         cacheEntry.failWithReply(exceptionReply);
         // leader will step down here
         if (isLeader() && leaderState != null) {
-          leaderState.submitUpdateStateEvent(new LeaderState.StateUpdateEvent(
-              LeaderState.StateUpdateEventType.STEPDOWN,
-              leaderState.getCurrentTerm()));
+          leaderState.submitStepDownEvent();
         }
         return CompletableFuture.completedFuture(exceptionReply);
       }
@@ -777,7 +774,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
             getId(), role, candidateId, candidateTerm, state.getLeaderId(), 
state.getCurrentTerm(),
             isFollower()? heartbeatMonitor.getLastRpcTime().elapsedTimeMs() + 
"ms": null);
       } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
-        boolean termUpdated = changeToFollower(candidateTerm, false);
+        final boolean termUpdated = changeToFollower(candidateTerm);
         // see Section 5.4.1 Election restriction
         if (state.isLogUpToDate(candidateLastEntry)) {
           heartbeatMonitor.updateLastRpcTime(false);
@@ -910,7 +907,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
         }
         return CompletableFuture.completedFuture(reply);
       }
-      changeToFollower(leaderTerm, true);
+      changeToFollowerAndPersistMetadata(leaderTerm);
       state.setLeader(leaderId, "appendEntries");
 
       if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
@@ -1010,7 +1007,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
             " Reply: {}", getId(), reply);
         return reply;
       }
-      changeToFollower(leaderTerm, true);
+      changeToFollowerAndPersistMetadata(leaderTerm);
       state.setLeader(leaderId, "installSnapshot");
 
       if (lifeCycle.getCurrentState() == RUNNING) {
@@ -1062,10 +1059,8 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         groupId, term, lastEntry);
   }
 
-  public synchronized void submitLocalSyncEvent() {
-    if (isLeader() && leaderState != null) {
-      leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT);
-    }
+  public void submitUpdateCommitEvent() {
+    
Optional.ofNullable(leaderState).ifPresent(LeaderState::submitUpdateCommitEvent);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index c0d1cb9..715370b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -61,7 +61,7 @@ class RaftLogWorker implements Runnable {
 
   private final RaftStorage storage;
   private volatile LogOutputStream out;
-  private final RaftServerImpl raftServer;
+  private final Runnable submitUpdateCommitEvent;
   private final StateMachine stateMachine;
   private final Supplier<Timer> logFlushTimer;
 
@@ -86,7 +86,7 @@ class RaftLogWorker implements Runnable {
     this.name = selfId + "-" + getClass().getSimpleName();
     LOG.info("new {} for {}", name, storage);
 
-    this.raftServer = raftServer;
+    this.submitUpdateCommitEvent = raftServer != null? 
raftServer::submitUpdateCommitEvent: () -> {};
     this.stateMachine = raftServer != null? raftServer.getStateMachine(): null;
 
     this.storage = storage;
@@ -100,11 +100,8 @@ class RaftLogWorker implements Runnable {
     this.workerThread = new Thread(this, name);
 
     // Server Id can be null in unit tests
-    Supplier<String> serverId = () -> raftServer == null || raftServer.getId() 
== null
-        ? "null" : raftServer.getId().toString();
     this.logFlushTimer = JavaUtils.memoize(() -> 
RatisMetricsRegistry.getRegistry()
-        .timer(MetricRegistry.name(RaftLogWorker.class, serverId.get(),
-            "flush-time")));
+        .timer(MetricRegistry.name(RaftLogWorker.class, selfId.toString(), 
"flush-time")));
   }
 
   void start(long latestIndex, File openSegmentFile) throws IOException {
@@ -243,9 +240,7 @@ class RaftLogWorker implements Runnable {
   private void updateFlushedIndex() {
     flushedIndex = lastWrittenIndex;
     pendingFlushNum = 0;
-    if (raftServer != null) {
-      raftServer.submitLocalSyncEvent();
-    }
+    submitUpdateCommitEvent.run();
   }
 
   /**
@@ -288,9 +283,8 @@ class RaftLogWorker implements Runnable {
           // this.entry != entry iff the entry has state machine data
           this.stateMachineFuture = stateMachine.writeStateMachineData(entry);
         } catch (Throwable e) {
-          LOG.error("{}: writeStateMachineData failed for index:{} proto:{}",
-              raftServer.getId() ,entry.getIndex(),
-              ServerProtoUtils.toString(entry), e.getMessage());
+          LOG.error(name + ": writeStateMachineData failed for index " + 
entry.getIndex()
+              + ", entry=" + ServerProtoUtils.toLogEntryString(entry), e);
           throw e;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bbfb8754/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 4d59bf3..862e21f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -105,8 +105,7 @@ public class SegmentedRaftLog extends RaftLog {
   private final long segmentMaxSize;
 
   public SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
-      RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties)
-      throws IOException {
+      RaftStorage storage, long lastIndexInSnapshot, RaftProperties 
properties) {
     super(selfId, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties)
         .getSizeInt());
     this.server = server;

Reply via email to