This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e493b60  RATIS-559. LogAppender.getPrevious throws 
IllegalStateException.
e493b60 is described below

commit e493b60b795ce0ec41affbb5052d7c6a3255dfff
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Fri May 24 14:06:50 2019 +0800

    RATIS-559. LogAppender.getPrevious throws IllegalStateException.
---
 .../org/apache/ratis/server/impl/LogAppender.java  | 51 +++++++++++++++++-----
 .../ratis/server/impl/RaftServerConstants.java     |  6 ++-
 .../org/apache/ratis/server/raftlog/RaftLog.java   |  3 ++
 3 files changed, 48 insertions(+), 12 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index ff13136..d5a6ec0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -169,23 +169,33 @@ public class LogAppender {
     return getFollower().getPeer().getId();
   }
 
-  private TermIndex getPrevious() {
-    final long nextIndex = follower.getNextIndex();
-    final TermIndex previous = raftLog.getTermIndex(nextIndex - 1);
+  private TermIndex getPrevious(long nextIndex) {
+    if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
+      return null;
+    }
+
+    final long previousIndex = nextIndex - 1;
+    final TermIndex previous = raftLog.getTermIndex(previousIndex);
     if (previous != null) {
       return previous;
     }
-    final long logStartIndex = raftLog.getStartIndex();
-    Preconditions.assertTrue(nextIndex == logStartIndex,
-        "%s: follower's nextIndex = %s != logStartIndex = %s", this, 
nextIndex, logStartIndex);
+
     final SnapshotInfo snapshot = server.getState().getLatestSnapshot();
-    return snapshot == null ? null : snapshot.getTermIndex();
+    if (snapshot != null) {
+      final TermIndex snapshotTermIndex = snapshot.getTermIndex();
+      if (snapshotTermIndex.getIndex() == previousIndex) {
+        return snapshotTermIndex;
+      }
+    }
+
+    return null;
   }
 
   protected AppendEntriesRequestProto createRequest(long callId) throws 
RaftLogIOException {
-    final TermIndex previous = getPrevious();
+    final TermIndex previous = getPrevious(follower.getNextIndex());
     final long heartbeatRemainingMs = getHeartbeatRemainingTime();
     if (heartbeatRemainingMs <= 0L) {
+      // heartbeat
       return leaderState.newAppendEntriesRequestProto(
           getFollowerId(), previous, Collections.emptyList(), 
!follower.isAttendingVote(), callId);
     }
@@ -193,7 +203,9 @@ public class LogAppender {
     Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + 
buffer.getNumElements() + " elements.");
 
     final long leaderNext = raftLog.getNextIndex();
-    for (long next = follower.getNextIndex(); leaderNext > next; ) {
+    final long followerNext = follower.getNextIndex();
+    final long halfMs = heartbeatRemainingMs/2;
+    for (long next = followerNext; leaderNext > next && 
getHeartbeatRemainingTime() - halfMs > 0; ) {
       if (!buffer.offer(raftLog.getEntryWithData(next++))) {
         break;
       }
@@ -202,13 +214,30 @@ public class LogAppender {
       return null;
     }
 
-    final List<LogEntryProto> protos = buffer.pollList(heartbeatRemainingMs, 
EntryWithData::getEntry,
-        (entry, time, exception) -> LOG.warn(this + ": Failed get " + entry + 
" in " + time, exception));
+    final List<LogEntryProto> protos = 
buffer.pollList(getHeartbeatRemainingTime(), EntryWithData::getEntry,
+        (entry, time, exception) -> LOG.warn("{}: Failed to get {} in {}: {}",
+            follower.getName(), entry, time, exception));
     buffer.clear();
+    assertProtos(protos, followerNext, previous);
     return leaderState.newAppendEntriesRequestProto(
         getFollowerId(), previous, protos, !follower.isAttendingVote(), 
callId);
   }
 
+  private void assertProtos(List<LogEntryProto> protos, long nextIndex, 
TermIndex previous) {
+    if (protos.isEmpty()) {
+      return;
+    }
+    final long firstIndex = protos.get(0).getIndex();
+    Preconditions.assertTrue(firstIndex == nextIndex,
+        () -> follower.getName() + ": firstIndex = " + firstIndex + " != 
nextIndex = " + nextIndex);
+    if (firstIndex > RaftLog.LEAST_VALID_LOG_INDEX) {
+      Objects.requireNonNull(previous,
+          () -> follower.getName() + ": Previous TermIndex not found for 
firstIndex = " + firstIndex);
+      Preconditions.assertTrue(previous.getIndex() == firstIndex - 1,
+          () -> follower.getName() + ": Previous = " + previous + " but 
firstIndex = " + firstIndex);
+    }
+  }
+
   /** Send an appendEntries RPC; retry indefinitely. */
   private AppendEntriesReplyProto sendAppendEntriesWithRetries()
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index f70f98a..42eaed2 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -17,8 +17,12 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.server.raftlog.RaftLog;
+
 public interface RaftServerConstants {
-  long INVALID_LOG_INDEX = -1;
+  /** @deprecated use {@link RaftLog#LEAST_VALID_LOG_INDEX} - 1. */
+  @Deprecated
+  long INVALID_LOG_INDEX = RaftLog.LEAST_VALID_LOG_INDEX - 1;
   long DEFAULT_CALLID = 0;
 
   enum StartupOption {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index bc35014..fe7625d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -60,6 +60,9 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
   private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", 
getSelfId(), s);
   private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", 
getSelfId(), s);
 
+  /** The least valid log index, i.e. the index used when writing to an empty 
log. */
+  public static final long LEAST_VALID_LOG_INDEX = 0L;
+
   /**
    * The largest committed index. Note the last committed log may be included
    * in the latest snapshot file.

Reply via email to