This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 051fe4e  RATIS-595. appendEntry future should be completed only after 
the entry is flushed.
051fe4e is described below

commit 051fe4e657d542f88c0dabbbd5a90908295f8964
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Jun 26 16:35:00 2019 +0800

    RATIS-595. appendEntry future should be completed only after the entry is 
flushed.
---
 .../org/apache/ratis/server/impl/LeaderState.java  |  2 +-
 .../ratis/server/impl/RaftServerConstants.java     |  4 +-
 .../org/apache/ratis/server/raftlog/RaftLog.java   |  8 +--
 .../ratis/server/raftlog/memory/MemoryRaftLog.java |  2 +-
 .../server/raftlog/segmented/SegmentedRaftLog.java | 18 +++--
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 78 ++++++++++++++++++----
 .../server/impl/RaftReconfigurationBaseTest.java   |  2 +-
 .../ratis/server/storage/RaftStorageTestUtils.java |  2 +-
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 36 +++++-----
 9 files changed, 105 insertions(+), 47 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 13b7c8e..2271908 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -560,7 +560,7 @@ public class LeaderState {
   }
 
   private void updateCommit() {
-    getMajorityMin(FollowerInfo::getMatchIndex, raftLog::getLatestFlushedIndex)
+    getMajorityMin(FollowerInfo::getMatchIndex, raftLog::getFlushIndex)
         .ifPresent(m -> updateCommit(m.majority, m.min));
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index 42eaed2..da05037 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -20,9 +20,9 @@ package org.apache.ratis.server.impl;
 import org.apache.ratis.server.raftlog.RaftLog;
 
 public interface RaftServerConstants {
-  /** @deprecated use {@link RaftLog#LEAST_VALID_LOG_INDEX} - 1. */
+  /** @deprecated use {@link RaftLog#INVALID_LOG_INDEX}. */
   @Deprecated
-  long INVALID_LOG_INDEX = RaftLog.LEAST_VALID_LOG_INDEX - 1;
+  long INVALID_LOG_INDEX = RaftLog.INVALID_LOG_INDEX;
   long DEFAULT_CALLID = 0;
 
   enum StartupOption {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index ae357dc..01474fd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -64,6 +64,7 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
 
   /** The least valid log index, i.e. the index used when writing to an empty 
log. */
   public static final long LEAST_VALID_LOG_INDEX = 0L;
+  public static final long INVALID_LOG_INDEX = LEAST_VALID_LOG_INDEX - 1;
 
   /**
    * The largest committed index. Note the last committed log may be included
@@ -118,7 +119,7 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
         // paper for details.
         final TermIndex entry = getTermIndex(majorityIndex);
         if (entry != null && entry.getTerm() == currentTerm) {
-          final long newCommitIndex = Math.min(majorityIndex, 
getLatestFlushedIndex());
+          final long newCommitIndex = Math.min(majorityIndex, getFlushIndex());
           if (newCommitIndex > oldCommittedIndex) {
             commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);
           }
@@ -368,10 +369,9 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
   protected abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto... 
entries);
 
   /**
-   * @return the index of the latest entry that has been flushed to the local
-   *         storage.
+   * @return the index of the last entry that has been flushed to the local 
storage.
    */
-  public abstract long getLatestFlushedIndex();
+  public abstract long getFlushIndex();
 
   /**
    * Write and flush the metadata (votedFor and term) into the meta file.
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index f203282..f07aa1a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -203,7 +203,7 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public long getLatestFlushedIndex() {
+  public long getFlushIndex() {
     return getNextIndex() - 1;
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 7ea75ce..6db41c7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -35,6 +35,7 @@ import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.StringUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -83,8 +84,13 @@ public class SegmentedRaftLog extends RaftLog {
     }
 
     void done() {
-      Preconditions.assertTrue(!future.isDone());
-      future.complete(getEndIndex());
+      completeFuture();
+    }
+
+    final void completeFuture() {
+      final boolean completed = future.complete(getEndIndex());
+      Preconditions.assertTrue(completed,
+          () -> this + " is already " + 
StringUtils.completableFuture2String(future, false));
     }
 
     void failed(IOException e) {
@@ -231,7 +237,7 @@ public class SegmentedRaftLog extends RaftLog {
       // segment's cache, should block the new entry appending or new segment
       // allocation.
       final RaftServerImpl s = server.get();
-      cache.evictCache(s.getFollowerNextIndices(), 
fileLogWorker.getFlushedIndex(), s.getState().getLastAppliedIndex());
+      cache.evictCache(s.getFollowerNextIndices(), 
fileLogWorker.getFlushIndex(), s.getState().getLastAppliedIndex());
     }
   }
 
@@ -386,8 +392,8 @@ public class SegmentedRaftLog extends RaftLog {
 
 
   @Override
-  public long getLatestFlushedIndex() {
-    return fileLogWorker.getFlushedIndex();
+  public long getFlushIndex() {
+    return fileLogWorker.getFlushIndex();
   }
 
   @Override
@@ -434,7 +440,7 @@ public class SegmentedRaftLog extends RaftLog {
   public String toString() {
     try(AutoCloseableLock readLock = readLock()) {
       if (isOpened()) {
-        return super.toString() + ",f" + getLatestFlushedIndex()
+        return super.toString() + ",f" + getFlushIndex()
             + ",i" + 
Optional.ofNullable(getLastEntryTermIndex()).map(TermIndex::getIndex).orElse(0L);
       } else {
         return super.toString();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index c4f6aa9..e3d181d 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -26,11 +26,12 @@ import org.apache.ratis.metrics.impl.RatisMetricRegistry;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.metrics.RatisMetrics;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.raftlog.RaftLogIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.SegmentFileInfo;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
@@ -43,10 +44,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 /**
@@ -92,11 +96,48 @@ class SegmentedRaftLogWorker implements Runnable {
     }
   }
 
+  static class WriteLogTasks {
+    private final Queue<WriteLog> q = new LinkedList<>();
+    private volatile long index;
+
+    void offerOrCompleteFuture(WriteLog writeLog) {
+      if (writeLog.getEndIndex() <= index || !offer(writeLog)) {
+        writeLog.completeFuture();
+      }
+    }
+
+    private synchronized boolean offer(WriteLog writeLog) {
+      if (writeLog.getEndIndex() <= index) { // compare again synchronized
+        return false;
+      }
+      q.offer(writeLog);
+      return true;
+    }
+
+    synchronized void updateIndex(long i) {
+      index = i;
+
+      for(;;) {
+        final Task peeked = q.peek();
+        if (peeked == null || peeked.getEndIndex() > index) {
+          return;
+        }
+        final Task polled = q.poll();
+        Preconditions.assertTrue(polled == peeked);
+        polled.completeFuture();
+      }
+    }
+  }
+
+  private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", 
this, s);
+  private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", 
this, s);
+
   private final String name;
   /**
    * The task queue accessed by rpc handler threads and the io worker thread.
    */
   private final DataBlockingQueue<Task> queue;
+  private final WriteLogTasks writeTasks = new WriteLogTasks();
   private volatile boolean running = true;
   private final Thread workerThread;
 
@@ -114,7 +155,7 @@ class SegmentedRaftLogWorker implements Runnable {
   /** the index of the last entry that has been written */
   private long lastWrittenIndex;
   /** the largest index of the entry that has been flushed */
-  private volatile long flushedIndex;
+  private final RaftLogIndex flushIndex = new RaftLogIndex("flushIndex", 0);
 
   private final int forceSyncNum;
 
@@ -167,7 +208,7 @@ class SegmentedRaftLogWorker implements Runnable {
   void start(long latestIndex, File openSegmentFile) throws IOException {
     LOG.trace("{} start(latestIndex={}, openSegmentFile={})", name, 
latestIndex, openSegmentFile);
     lastWrittenIndex = latestIndex;
-    flushedIndex = latestIndex;
+    flushIndex.setUnconditionally(latestIndex, infoIndexChange);
     if (openSegmentFile != null) {
       Preconditions.assertTrue(openSegmentFile.exists());
       out = new SegmentedRaftLogOutputStream(openSegmentFile, true, 
segmentMaxSize,
@@ -194,7 +235,7 @@ class SegmentedRaftLogWorker implements Runnable {
   void syncWithSnapshot(long lastSnapshotIndex) {
     queue.clear();
     lastWrittenIndex = lastSnapshotIndex;
-    flushedIndex = lastSnapshotIndex;
+    flushIndex.setUnconditionally(lastSnapshotIndex, infoIndexChange);
     pendingFlushNum = 0;
   }
 
@@ -312,13 +353,18 @@ class SegmentedRaftLogWorker implements Runnable {
       } finally {
         timerContext.stop();
       }
-      updateFlushedIndex();
+      updateFlushedIndexIncreasingly();
     }
   }
 
-  private void updateFlushedIndex() {
-    LOG.debug("{}: updateFlushedIndex {} -> {}", name, flushedIndex, 
lastWrittenIndex);
-    flushedIndex = lastWrittenIndex;
+  private void updateFlushedIndexIncreasingly() {
+    final long i = lastWrittenIndex;
+    flushIndex.updateIncreasingly(i, traceIndexChange);
+    postUpdateFlushedIndex();
+    writeTasks.updateIndex(i);
+  }
+
+  private void postUpdateFlushedIndex() {
     pendingFlushNum = 0;
     Optional.ofNullable(submitUpdateCommitEvent).ifPresent(Runnable::run);
   }
@@ -421,6 +467,11 @@ class SegmentedRaftLogWorker implements Runnable {
     }
 
     @Override
+    void done() {
+      writeTasks.offerOrCompleteFuture(this);
+    }
+
+    @Override
     public void execute() throws IOException {
       if (stateMachineDataPolicy.isSync() && stateMachineFuture != null) {
         stateMachineDataPolicy.getFromFuture(stateMachineFuture, () -> this + 
"-writeStateMachineData");
@@ -477,7 +528,7 @@ class SegmentedRaftLogWorker implements Runnable {
         FileUtils.deleteFile(openFile);
         LOG.info("{}: Deleted empty log segment {}", name, openFile);
       }
-      updateFlushedIndex();
+      updateFlushedIndexIncreasingly();
     }
 
     @Override
@@ -589,7 +640,8 @@ class SegmentedRaftLogWorker implements Runnable {
       if (stateMachineFuture != null) {
         IOUtils.getFromFuture(stateMachineFuture, () -> this + 
"-truncateStateMachineData");
       }
-      updateFlushedIndex();
+      flushIndex.setUnconditionally(lastWrittenIndex, infoIndexChange);
+      postUpdateFlushedIndex();
     }
 
     @Override
@@ -599,7 +651,7 @@ class SegmentedRaftLogWorker implements Runnable {
       } else if (segments.toDelete.length > 0) {
         return segments.toDelete[segments.toDelete.length - 1].endIndex;
       }
-      return RaftServerConstants.INVALID_LOG_INDEX;
+      return RaftLog.INVALID_LOG_INDEX;
     }
 
     @Override
@@ -608,7 +660,7 @@ class SegmentedRaftLogWorker implements Runnable {
     }
   }
 
-  long getFlushedIndex() {
-    return flushedIndex;
+  long getFlushIndex() {
+    return flushIndex.get();
   }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index d937950..ac022bf 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -532,7 +532,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER 
extends MiniRaftCluste
       }, 10, sleepTime, "confIndex", LOG);
 
       // wait till the old leader persist the new conf
-      JavaUtils.attempt(() -> log.getLatestFlushedIndex() >= confIndex,
+      JavaUtils.attempt(() -> log.getFlushIndex() >= confIndex,
           10, sleepTime, "FLUSH", LOG);
       final long committed = log.getLastCommittedIndex();
       Assert.assertTrue(committed < confIndex);
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index 07283b0..eb52869 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -43,7 +43,7 @@ public interface RaftStorageTestUtils {
     final long flushed, committed;
     try(AutoCloseableLock readlock = log.readLock()) {
       last = log.getLastEntryTermIndex();
-      flushed = log.getLatestFlushedIndex();
+      flushed = log.getFlushIndex();
       committed = log.getLastCommittedIndex();
     }
     final StringBuilder b = new StringBuilder();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index cda1043..353dffb 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -163,7 +163,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     // create RaftLog object and load log file
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if log entries are loaded correctly
       for (LogEntryProto e : entries) {
         LogEntryProto entry = raftLog.get(e.getIndex());
@@ -219,21 +219,21 @@ public class TestSegmentedRaftLog extends BaseTest {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       checkEntries(raftLog, entries, 0, entries.size());
     }
 
     try (SegmentedRaftLog raftLog =
         new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       TermIndex lastTermIndex  = raftLog.getLastEntryTermIndex();
       IllegalStateException ex = null;
       try {
@@ -272,14 +272,14 @@ public class TestSegmentedRaftLog extends BaseTest {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       checkEntries(raftLog, entries, 0, entries.size());
       Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
@@ -294,7 +294,7 @@ public class TestSegmentedRaftLog extends BaseTest {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
@@ -308,7 +308,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       throws Exception {
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // truncate the log
       raftLog.truncate(fromIndex).join();
 
@@ -318,7 +318,7 @@ public class TestSegmentedRaftLog extends BaseTest {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       if (fromIndex > 0) {
         Assert.assertEquals(entries.get((int) (fromIndex - 1)),
@@ -402,7 +402,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     final RaftProperties p = new RaftProperties();
     RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
     try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, 
storage, -1, p)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
       final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
       final Long purged = f.get();
@@ -426,7 +426,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     
doCallRealMethod().when(server).failClientRequest(any(LogEntryProto.class));
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, 
entry));
       // append entries to the raftlog
       
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
@@ -442,7 +442,7 @@ public class TestSegmentedRaftLog extends BaseTest {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       LOG.info("newEntries[0] = {}", newEntries.get(0));
       final int last = newEntries.size() - 1;
       LOG.info("newEntries[{}] = {}", last, newEntries.get(last));
@@ -454,19 +454,19 @@ public class TestSegmentedRaftLog extends BaseTest {
       Assert.assertEquals(newEntries.get(newEntries.size() - 1),
           getLastEntry(raftLog));
       Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
-          raftLog.getLatestFlushedIndex());
+          raftLog.getFlushIndex());
     }
 
     // load the raftlog again and check
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       checkEntries(raftLog, entries, 0, 650);
       checkEntries(raftLog, newEntries, 100, 100);
       Assert.assertEquals(newEntries.get(newEntries.size() - 1),
           getLastEntry(raftLog));
       Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
-          raftLog.getLatestFlushedIndex());
+          raftLog.getFlushIndex());
 
       SegmentedRaftLogCache cache = raftLog.getRaftLogCache();
       Assert.assertEquals(5, cache.getNumOfSegments());
@@ -480,7 +480,7 @@ public class TestSegmentedRaftLog extends BaseTest {
 
     final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
     try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, 
null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
 
       int next = 0;
       long flush = -1;
@@ -544,7 +544,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     doNothing().when(server).shutdown(false);
     Throwable ex = null; // TimeoutIOException
     try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, sm, 
null, storage, -1, properties)) {
-      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // SegmentedRaftLogWorker should catch TimeoutIOException
       CompletableFuture<Long> f = raftLog.appendEntry(entry);
       // Wait for async writeStateMachineData to finish
@@ -567,7 +567,7 @@ public class TestSegmentedRaftLog extends BaseTest {
 
   void assertIndices(RaftLog raftLog, long expectedFlushIndex, long 
expectedNextIndex) {
     LOG.info("assert expectedFlushIndex={}", expectedFlushIndex);
-    Assert.assertEquals(expectedFlushIndex, raftLog.getLatestFlushedIndex());
+    Assert.assertEquals(expectedFlushIndex, raftLog.getFlushIndex());
     LOG.info("assert expectedNextIndex={}", expectedNextIndex);
     Assert.assertEquals(expectedNextIndex, raftLog.getNextIndex());
   }

Reply via email to