This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_optimize_reader
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_optimize_reader by this 
push:
     new dd242f5b23 move config and fix log. (#7304)
dd242f5b23 is described below

commit dd242f5b23262a82481712ddedd5bbc369cc0528
Author: ZhangHongYin <[email protected]>
AuthorDate: Wed Sep 14 14:27:01 2022 +0800

    move config and fix log. (#7304)
---
 .../iotdb/consensus/config/MultiLeaderConfig.java  | 13 ++++-
 .../multileader/logdispatcher/IndexController.java | 11 ++--
 .../multileader/logdispatcher/LogDispatcher.java   | 24 ++++----
 .../multileader/MultiLeaderConsensusTest.java      | 64 ++++++++--------------
 .../logdispatcher/IndexControllerTest.java         | 41 +++++++-------
 .../multileader/logdispatcher/SyncStatusTest.java  | 13 +++--
 6 files changed, 85 insertions(+), 81 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 1c12022dc5..2a334b0b84 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -204,6 +204,7 @@ public class MultiLeaderConfig {
     private final long maxRetryWaitTimeMs;
     private final long walThrottleThreshold;
     private final long throttleTimeOutMs;
+    private final long checkpointGap;
 
     private Replication(
         int maxPendingRequestNumPerNode,
@@ -213,7 +214,8 @@ public class MultiLeaderConfig {
         long basicRetryWaitTimeMs,
         long maxRetryWaitTimeMs,
         long walThrottleThreshold,
-        long throttleTimeOutMs) {
+        long throttleTimeOutMs,
+        long checkpointGap) {
       this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
       this.maxRequestPerBatch = maxRequestPerBatch;
       this.maxPendingBatch = maxPendingBatch;
@@ -222,6 +224,7 @@ public class MultiLeaderConfig {
       this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
       this.walThrottleThreshold = walThrottleThreshold;
       this.throttleTimeOutMs = throttleTimeOutMs;
+      this.checkpointGap = checkpointGap;
     }
 
     public int getMaxPendingRequestNumPerNode() {
@@ -256,6 +259,10 @@ public class MultiLeaderConfig {
       return throttleTimeOutMs;
     }
 
+    public long getCheckpointGap() {
+      return checkpointGap;
+    }
+
     public static Replication.Builder newBuilder() {
       return new Replication.Builder();
     }
@@ -271,6 +278,7 @@ public class MultiLeaderConfig {
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
       private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
       private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
+      private long checkpointGap = 500;
 
       public Replication.Builder setMaxPendingRequestNumPerNode(int 
maxPendingRequestNumPerNode) {
         this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
@@ -322,7 +330,8 @@ public class MultiLeaderConfig {
             basicRetryWaitTimeMs,
             maxRetryWaitTimeMs,
             walThrottleThreshold,
-            throttleTimeOutMs);
+            throttleTimeOutMs,
+            checkpointGap);
       }
     }
   }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
index 62cc0e5894..027bf6aa34 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
@@ -38,8 +38,6 @@ public class IndexController {
 
   private final Logger logger = LoggerFactory.getLogger(IndexController.class);
 
-  public static final int FLUSH_INTERVAL = 500;
-
   private long lastFlushedIndex;
   private long currentIndex;
 
@@ -48,9 +46,12 @@ public class IndexController {
   private final String storageDir;
   private final String prefix;
 
-  public IndexController(String storageDir, String prefix) {
+  private final long checkpointGap;
+
+  public IndexController(String storageDir, String prefix, long checkpointGap) 
{
     this.storageDir = storageDir;
     this.prefix = prefix + '-';
+    this.checkpointGap = checkpointGap;
     restore();
   }
 
@@ -87,13 +88,13 @@ public class IndexController {
   }
 
   private void checkPersist() {
-    if (currentIndex - lastFlushedIndex >= FLUSH_INTERVAL) {
+    if (currentIndex - lastFlushedIndex >= checkpointGap) {
       persist();
     }
   }
 
   private void persist() {
-    long flushIndex = currentIndex - currentIndex % FLUSH_INTERVAL;
+    long flushIndex = currentIndex - currentIndex % checkpointGap;
     File oldFile = new File(storageDir, prefix + lastFlushedIndex);
     File newFile = new File(storageDir, prefix + flushIndex);
     try {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 2973e4e3ce..70b419af03 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -129,6 +129,7 @@ public class LogDispatcher {
 
   public class LogDispatcherThread implements Runnable {
     private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
+    private static final long START_INDEX = 1;
     private final MultiLeaderConfig config;
     private final Peer peer;
     private final IndexController controller;
@@ -144,7 +145,6 @@ public class LogDispatcher {
     private volatile boolean stopped = false;
 
     private ConsensusReqReader.ReqIterator walEntryiterator;
-    private long iteratorIndex = 1;
 
     public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
       this.peer = peer;
@@ -153,9 +153,11 @@ public class LogDispatcher {
           new 
ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
       this.controller =
           new IndexController(
-              impl.getStorageDir(), 
Utils.fromTEndPointToString(peer.getEndpoint()));
+              impl.getStorageDir(),
+              Utils.fromTEndPointToString(peer.getEndpoint()),
+              config.getReplication().getCheckpointGap());
       this.syncStatus = new SyncStatus(controller, config);
-      this.walEntryiterator = reader.getReqIterator(iteratorIndex);
+      this.walEntryiterator = reader.getReqIterator(START_INDEX);
     }
 
     public IndexController getController() {
@@ -221,7 +223,11 @@ public class LogDispatcher {
       // update safely deleted search index to delete outdated info,
       // indicating that insert nodes whose search index are before this value 
can be deleted
       // safely
-      
reader.setSafelyDeletedSearchIndex(impl.getCurrentSafelyDeletedSearchIndex());
+      long currentSafelyDeletedSearchIndex =
+          impl.getCurrentSafelyDeletedSearchIndex()
+              / config.getReplication().getCheckpointGap()
+              * config.getReplication().getCheckpointGap();
+      reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
       // notify
       if (impl.unblockWrite()) {
         impl.signal();
@@ -340,16 +346,12 @@ public class LogDispatcher {
         long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
       logger.debug(
           String.format(
-              "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d, 
iteratorIndex: %d",
-              peer.getGroupId().getId(),
-              peer.getEndpoint().ip,
-              currentIndex,
-              maxIndex,
-              iteratorIndex));
+              "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d",
+              peer.getGroupId().getId(), peer.getEndpoint().getIp(), 
currentIndex, maxIndex));
+      // targetIndex is the index of request that we need to find
       long targetIndex = currentIndex;
       // Even if there is no WAL files, these code won't produce error.
       walEntryiterator.skipTo(targetIndex);
-
       while (targetIndex < maxIndex
           && logBatches.size() < 
config.getReplication().getMaxRequestPerBatch()) {
         logger.debug("construct from WAL for one Entry, index : {}", 
targetIndex);
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index fdc1990dbc..a170b5a269 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.common.ConsensusGroup;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
-import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
 import org.apache.iotdb.consensus.multileader.util.TestEntry;
 import org.apache.iotdb.consensus.multileader.util.TestStateMachine;
 
@@ -45,7 +44,7 @@ import java.util.Arrays;
 import java.util.List;
 
 public class MultiLeaderConsensusTest {
-
+  private static final long CHECK_POINT_GAP = 500;
   private final Logger logger = 
LoggerFactory.getLogger(MultiLeaderConsensusTest.class);
 
   private final ConsensusGroupId gid = new DataRegionId(1);
@@ -124,7 +123,7 @@ public class MultiLeaderConsensusTest {
     Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
     Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
 
-    for (int i = 0; i < IndexController.FLUSH_INTERVAL; i++) {
+    for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
       servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
@@ -135,8 +134,7 @@ public class MultiLeaderConsensusTest {
 
     for (int i = 0; i < 3; i++) {
       long start = System.currentTimeMillis();
-      while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
-          < IndexController.FLUSH_INTERVAL) {
+      while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() 
< CHECK_POINT_GAP) {
         long current = System.currentTimeMillis();
         if ((current - start) > 60 * 1000) {
           Assert.fail("Unable to replicate entries");
@@ -146,20 +144,14 @@ public class MultiLeaderConsensusTest {
     }
 
     Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 3, 
stateMachines.get(0).getRequestSet().size());
+        CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
     Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 3, 
stateMachines.get(1).getRequestSet().size());
+        CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
     Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 3, 
stateMachines.get(2).getRequestSet().size());
+        CHECK_POINT_GAP, 
servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+    Assert.assertEquals(CHECK_POINT_GAP * 3, 
stateMachines.get(0).getRequestSet().size());
+    Assert.assertEquals(CHECK_POINT_GAP * 3, 
stateMachines.get(1).getRequestSet().size());
+    Assert.assertEquals(CHECK_POINT_GAP * 3, 
stateMachines.get(2).getRequestSet().size());
     Assert.assertEquals(stateMachines.get(0).getData(), 
stateMachines.get(1).getData());
     Assert.assertEquals(stateMachines.get(2).getData(), 
stateMachines.get(1).getData());
 
@@ -170,14 +162,13 @@ public class MultiLeaderConsensusTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
servers.get(1).getImpl(gid).getIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
servers.get(2).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(2).getImpl(gid).getIndex());
 
     for (int i = 0; i < 3; i++) {
       long start = System.currentTimeMillis();
-      while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
-          < IndexController.FLUSH_INTERVAL) {
+      while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() 
< CHECK_POINT_GAP) {
         long current = System.currentTimeMillis();
         if ((current - start) > 60 * 1000) {
           Assert.fail("Unable to recover entries");
@@ -187,14 +178,11 @@ public class MultiLeaderConsensusTest {
     }
 
     Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+        CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
     Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+        CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
     Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL,
-        servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+        CHECK_POINT_GAP, 
servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
   }
 
   /**
@@ -211,7 +199,7 @@ public class MultiLeaderConsensusTest {
     Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
     Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
 
-    for (int i = 0; i < IndexController.FLUSH_INTERVAL; i++) {
+    for (int i = 0; i < CHECK_POINT_GAP; i++) {
       servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
       servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
       Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
@@ -230,16 +218,15 @@ public class MultiLeaderConsensusTest {
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
 
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
servers.get(0).getImpl(gid).getIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
servers.get(1).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getIndex());
     Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
 
     for (int i = 0; i < 2; i++) {
       long start = System.currentTimeMillis();
-      // should be [IndexController.FLUSH_INTERVAL, 
IndexController.FLUSH_INTERVAL * 2 - 1] after
+      // should be [CHECK_POINT_GAP, CHECK_POINT_GAP * 2 - 1] after
       // replicating all entries
-      while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
-          < IndexController.FLUSH_INTERVAL) {
+      while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() 
< CHECK_POINT_GAP) {
         long current = System.currentTimeMillis();
         if ((current - start) > 60 * 1000) {
           logger.error("{}", 
servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -249,12 +236,9 @@ public class MultiLeaderConsensusTest {
       }
     }
 
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 2, 
stateMachines.get(0).getRequestSet().size());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 2, 
stateMachines.get(1).getRequestSet().size());
-    Assert.assertEquals(
-        IndexController.FLUSH_INTERVAL * 2, 
stateMachines.get(2).getRequestSet().size());
+    Assert.assertEquals(CHECK_POINT_GAP * 2, 
stateMachines.get(0).getRequestSet().size());
+    Assert.assertEquals(CHECK_POINT_GAP * 2, 
stateMachines.get(1).getRequestSet().size());
+    Assert.assertEquals(CHECK_POINT_GAP * 2, 
stateMachines.get(2).getRequestSet().size());
 
     Assert.assertEquals(stateMachines.get(0).getData(), 
stateMachines.get(1).getData());
     Assert.assertEquals(stateMachines.get(2).getData(), 
stateMachines.get(1).getData());
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
index aa6f63783f..2675470ee5 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
@@ -33,6 +33,8 @@ public class IndexControllerTest {
   private static final File storageDir = new File("target" + 
java.io.File.separator + "test");
   private static final String prefix = "version";
 
+  private static final long CHECK_POINT_GAP = 500;
+
   @Before
   public void setUp() throws IOException {
     FileUtils.createDirectories(storageDir);
@@ -46,37 +48,38 @@ public class IndexControllerTest {
   /** test indexController when incrementIntervalAfterRestart == false */
   @Test
   public void testIncrementIntervalAfterRestart() {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller =
+        new IndexController(storageDir.getAbsolutePath(), prefix, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(IndexController.FLUSH_INTERVAL - 1);
+    controller.updateAndGet(CHECK_POINT_GAP - 1);
 
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL - 1, 
controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(IndexController.FLUSH_INTERVAL + 1);
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL + 1, 
controller.getCurrentIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getLastFlushedIndex());
+    controller.updateAndGet(CHECK_POINT_GAP + 1);
+    Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix);
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getCurrentIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getLastFlushedIndex());
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 
CHECK_POINT_GAP);
+    Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(IndexController.FLUSH_INTERVAL * 2 - 1);
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 - 1, 
controller.getCurrentIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getLastFlushedIndex());
+    controller.updateAndGet(CHECK_POINT_GAP * 2 - 1);
+    Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix);
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getCurrentIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getLastFlushedIndex());
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 
CHECK_POINT_GAP);
+    Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(IndexController.FLUSH_INTERVAL * 2 + 1);
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 + 1, 
controller.getCurrentIndex());
-    Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2, 
controller.getLastFlushedIndex());
+    controller.updateAndGet(CHECK_POINT_GAP * 2 + 1);
+    Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
   }
 }
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index db6b03377a..f7c90729f8 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -40,6 +40,7 @@ public class SyncStatusTest {
   private static final File storageDir = new File("target" + 
java.io.File.separator + "test");
   private static final String prefix = "version";
   private static final MultiLeaderConfig config = new 
MultiLeaderConfig.Builder().build();
+  private static final long CHECK_POINT_GAP = 500;
 
   @Before
   public void setUp() throws IOException {
@@ -54,7 +55,8 @@ public class SyncStatusTest {
   /** Confirm success from front to back */
   @Test
   public void sequenceTest() throws InterruptedException {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller =
+        new IndexController(storageDir.getAbsolutePath(), prefix, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
@@ -79,7 +81,8 @@ public class SyncStatusTest {
   /** Confirm success from back to front */
   @Test
   public void reverseTest() throws InterruptedException {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller =
+        new IndexController(storageDir.getAbsolutePath(), prefix, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -111,7 +114,8 @@ public class SyncStatusTest {
   /** Confirm success first from front to back, then back to front */
   @Test
   public void mixedTest() throws InterruptedException {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller =
+        new IndexController(storageDir.getAbsolutePath(), prefix, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -155,7 +159,8 @@ public class SyncStatusTest {
   /** Test Blocking while addNextBatch */
   @Test
   public void waitTest() throws InterruptedException, ExecutionException {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller =
+        new IndexController(storageDir.getAbsolutePath(), prefix, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);

Reply via email to