This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_optimize_reader
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_optimize_reader by this
push:
new dd242f5b23 move config and fix log. (#7304)
dd242f5b23 is described below
commit dd242f5b23262a82481712ddedd5bbc369cc0528
Author: ZhangHongYin <[email protected]>
AuthorDate: Wed Sep 14 14:27:01 2022 +0800
move config and fix log. (#7304)
---
.../iotdb/consensus/config/MultiLeaderConfig.java | 13 ++++-
.../multileader/logdispatcher/IndexController.java | 11 ++--
.../multileader/logdispatcher/LogDispatcher.java | 24 ++++----
.../multileader/MultiLeaderConsensusTest.java | 64 ++++++++--------------
.../logdispatcher/IndexControllerTest.java | 41 +++++++-------
.../multileader/logdispatcher/SyncStatusTest.java | 13 +++--
6 files changed, 85 insertions(+), 81 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 1c12022dc5..2a334b0b84 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -204,6 +204,7 @@ public class MultiLeaderConfig {
private final long maxRetryWaitTimeMs;
private final long walThrottleThreshold;
private final long throttleTimeOutMs;
+ private final long checkpointGap;
private Replication(
int maxPendingRequestNumPerNode,
@@ -213,7 +214,8 @@ public class MultiLeaderConfig {
long basicRetryWaitTimeMs,
long maxRetryWaitTimeMs,
long walThrottleThreshold,
- long throttleTimeOutMs) {
+ long throttleTimeOutMs,
+ long checkpointGap) {
this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
this.maxRequestPerBatch = maxRequestPerBatch;
this.maxPendingBatch = maxPendingBatch;
@@ -222,6 +224,7 @@ public class MultiLeaderConfig {
this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
this.walThrottleThreshold = walThrottleThreshold;
this.throttleTimeOutMs = throttleTimeOutMs;
+ this.checkpointGap = checkpointGap;
}
public int getMaxPendingRequestNumPerNode() {
@@ -256,6 +259,10 @@ public class MultiLeaderConfig {
return throttleTimeOutMs;
}
+ public long getCheckpointGap() {
+ return checkpointGap;
+ }
+
public static Replication.Builder newBuilder() {
return new Replication.Builder();
}
@@ -271,6 +278,7 @@ public class MultiLeaderConfig {
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
+ private long checkpointGap = 500;
public Replication.Builder setMaxPendingRequestNumPerNode(int
maxPendingRequestNumPerNode) {
this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
@@ -322,7 +330,8 @@ public class MultiLeaderConfig {
basicRetryWaitTimeMs,
maxRetryWaitTimeMs,
walThrottleThreshold,
- throttleTimeOutMs);
+ throttleTimeOutMs,
+ checkpointGap);
}
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
index 62cc0e5894..027bf6aa34 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
@@ -38,8 +38,6 @@ public class IndexController {
private final Logger logger = LoggerFactory.getLogger(IndexController.class);
- public static final int FLUSH_INTERVAL = 500;
-
private long lastFlushedIndex;
private long currentIndex;
@@ -48,9 +46,12 @@ public class IndexController {
private final String storageDir;
private final String prefix;
- public IndexController(String storageDir, String prefix) {
+ private final long checkpointGap;
+
+ public IndexController(String storageDir, String prefix, long checkpointGap)
{
this.storageDir = storageDir;
this.prefix = prefix + '-';
+ this.checkpointGap = checkpointGap;
restore();
}
@@ -87,13 +88,13 @@ public class IndexController {
}
private void checkPersist() {
- if (currentIndex - lastFlushedIndex >= FLUSH_INTERVAL) {
+ if (currentIndex - lastFlushedIndex >= checkpointGap) {
persist();
}
}
private void persist() {
- long flushIndex = currentIndex - currentIndex % FLUSH_INTERVAL;
+ long flushIndex = currentIndex - currentIndex % checkpointGap;
File oldFile = new File(storageDir, prefix + lastFlushedIndex);
File newFile = new File(storageDir, prefix + flushIndex);
try {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 2973e4e3ce..70b419af03 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -129,6 +129,7 @@ public class LogDispatcher {
public class LogDispatcherThread implements Runnable {
private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
+ private static final long START_INDEX = 1;
private final MultiLeaderConfig config;
private final Peer peer;
private final IndexController controller;
@@ -144,7 +145,6 @@ public class LogDispatcher {
private volatile boolean stopped = false;
private ConsensusReqReader.ReqIterator walEntryiterator;
- private long iteratorIndex = 1;
public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
this.peer = peer;
@@ -153,9 +153,11 @@ public class LogDispatcher {
new
ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
this.controller =
new IndexController(
- impl.getStorageDir(),
Utils.fromTEndPointToString(peer.getEndpoint()));
+ impl.getStorageDir(),
+ Utils.fromTEndPointToString(peer.getEndpoint()),
+ config.getReplication().getCheckpointGap());
this.syncStatus = new SyncStatus(controller, config);
- this.walEntryiterator = reader.getReqIterator(iteratorIndex);
+ this.walEntryiterator = reader.getReqIterator(START_INDEX);
}
public IndexController getController() {
@@ -221,7 +223,11 @@ public class LogDispatcher {
// update safely deleted search index to delete outdated info,
// indicating that insert nodes whose search index are before this value
can be deleted
// safely
-
reader.setSafelyDeletedSearchIndex(impl.getCurrentSafelyDeletedSearchIndex());
+ long currentSafelyDeletedSearchIndex =
+ impl.getCurrentSafelyDeletedSearchIndex()
+ / config.getReplication().getCheckpointGap()
+ * config.getReplication().getCheckpointGap();
+ reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
// notify
if (impl.unblockWrite()) {
impl.signal();
@@ -340,16 +346,12 @@ public class LogDispatcher {
long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
logger.debug(
String.format(
- "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d,
iteratorIndex: %d",
- peer.getGroupId().getId(),
- peer.getEndpoint().ip,
- currentIndex,
- maxIndex,
- iteratorIndex));
+ "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d",
+ peer.getGroupId().getId(), peer.getEndpoint().getIp(),
currentIndex, maxIndex));
+ // targetIndex is the index of request that we need to find
long targetIndex = currentIndex;
// Even if there is no WAL files, these code won't produce error.
walEntryiterator.skipTo(targetIndex);
-
while (targetIndex < maxIndex
&& logBatches.size() <
config.getReplication().getMaxRequestPerBatch()) {
logger.debug("construct from WAL for one Entry, index : {}",
targetIndex);
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index fdc1990dbc..a170b5a269 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
-import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
import org.apache.iotdb.consensus.multileader.util.TestEntry;
import org.apache.iotdb.consensus.multileader.util.TestStateMachine;
@@ -45,7 +44,7 @@ import java.util.Arrays;
import java.util.List;
public class MultiLeaderConsensusTest {
-
+ private static final long CHECK_POINT_GAP = 500;
private final Logger logger =
LoggerFactory.getLogger(MultiLeaderConsensusTest.class);
private final ConsensusGroupId gid = new DataRegionId(1);
@@ -124,7 +123,7 @@ public class MultiLeaderConsensusTest {
Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
- for (int i = 0; i < IndexController.FLUSH_INTERVAL; i++) {
+ for (int i = 0; i < CHECK_POINT_GAP; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
@@ -135,8 +134,7 @@ public class MultiLeaderConsensusTest {
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
- while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
- < IndexController.FLUSH_INTERVAL) {
+ while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
< CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
Assert.fail("Unable to replicate entries");
@@ -146,20 +144,14 @@ public class MultiLeaderConsensusTest {
}
Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 3,
stateMachines.get(0).getRequestSet().size());
+ CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 3,
stateMachines.get(1).getRequestSet().size());
+ CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 3,
stateMachines.get(2).getRequestSet().size());
+ CHECK_POINT_GAP,
servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+ Assert.assertEquals(CHECK_POINT_GAP * 3,
stateMachines.get(0).getRequestSet().size());
+ Assert.assertEquals(CHECK_POINT_GAP * 3,
stateMachines.get(1).getRequestSet().size());
+ Assert.assertEquals(CHECK_POINT_GAP * 3,
stateMachines.get(2).getRequestSet().size());
Assert.assertEquals(stateMachines.get(0).getData(),
stateMachines.get(1).getData());
Assert.assertEquals(stateMachines.get(2).getData(),
stateMachines.get(1).getData());
@@ -170,14 +162,13 @@ public class MultiLeaderConsensusTest {
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
servers.get(0).getImpl(gid).getIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
servers.get(1).getImpl(gid).getIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
servers.get(2).getImpl(gid).getIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(2).getImpl(gid).getIndex());
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
- while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
- < IndexController.FLUSH_INTERVAL) {
+ while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
< CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
Assert.fail("Unable to recover entries");
@@ -187,14 +178,11 @@ public class MultiLeaderConsensusTest {
}
Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+ CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+ CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+ CHECK_POINT_GAP,
servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
}
/**
@@ -211,7 +199,7 @@ public class MultiLeaderConsensusTest {
Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
- for (int i = 0; i < IndexController.FLUSH_INTERVAL; i++) {
+ for (int i = 0; i < CHECK_POINT_GAP; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
@@ -230,16 +218,15 @@ public class MultiLeaderConsensusTest {
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
servers.get(0).getImpl(gid).getIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
servers.get(1).getImpl(gid).getIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getIndex());
Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
for (int i = 0; i < 2; i++) {
long start = System.currentTimeMillis();
- // should be [IndexController.FLUSH_INTERVAL,
IndexController.FLUSH_INTERVAL * 2 - 1] after
+ // should be [CHECK_POINT_GAP, CHECK_POINT_GAP * 2 - 1] after
// replicating all entries
- while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
- < IndexController.FLUSH_INTERVAL) {
+ while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
< CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
logger.error("{}",
servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -249,12 +236,9 @@ public class MultiLeaderConsensusTest {
}
}
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 2,
stateMachines.get(0).getRequestSet().size());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 2,
stateMachines.get(1).getRequestSet().size());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 2,
stateMachines.get(2).getRequestSet().size());
+ Assert.assertEquals(CHECK_POINT_GAP * 2,
stateMachines.get(0).getRequestSet().size());
+ Assert.assertEquals(CHECK_POINT_GAP * 2,
stateMachines.get(1).getRequestSet().size());
+ Assert.assertEquals(CHECK_POINT_GAP * 2,
stateMachines.get(2).getRequestSet().size());
Assert.assertEquals(stateMachines.get(0).getData(),
stateMachines.get(1).getData());
Assert.assertEquals(stateMachines.get(2).getData(),
stateMachines.get(1).getData());
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
index aa6f63783f..2675470ee5 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
@@ -33,6 +33,8 @@ public class IndexControllerTest {
private static final File storageDir = new File("target" +
java.io.File.separator + "test");
private static final String prefix = "version";
+ private static final long CHECK_POINT_GAP = 500;
+
@Before
public void setUp() throws IOException {
FileUtils.createDirectories(storageDir);
@@ -46,37 +48,38 @@ public class IndexControllerTest {
/** test indexController when incrementIntervalAfterRestart == false */
@Test
public void testIncrementIntervalAfterRestart() {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller =
+ new IndexController(storageDir.getAbsolutePath(), prefix,
CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller.updateAndGet(IndexController.FLUSH_INTERVAL - 1);
+ controller.updateAndGet(CHECK_POINT_GAP - 1);
- Assert.assertEquals(IndexController.FLUSH_INTERVAL - 1,
controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix,
CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller.updateAndGet(IndexController.FLUSH_INTERVAL + 1);
- Assert.assertEquals(IndexController.FLUSH_INTERVAL + 1,
controller.getCurrentIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getLastFlushedIndex());
+ controller.updateAndGet(CHECK_POINT_GAP + 1);
+ Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix);
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getCurrentIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getLastFlushedIndex());
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix,
CHECK_POINT_GAP);
+ Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
- controller.updateAndGet(IndexController.FLUSH_INTERVAL * 2 - 1);
- Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 - 1,
controller.getCurrentIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getLastFlushedIndex());
+ controller.updateAndGet(CHECK_POINT_GAP * 2 - 1);
+ Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix);
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getCurrentIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getLastFlushedIndex());
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix,
CHECK_POINT_GAP);
+ Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
- controller.updateAndGet(IndexController.FLUSH_INTERVAL * 2 + 1);
- Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 + 1,
controller.getCurrentIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2,
controller.getLastFlushedIndex());
+ controller.updateAndGet(CHECK_POINT_GAP * 2 + 1);
+ Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
}
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index db6b03377a..f7c90729f8 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -40,6 +40,7 @@ public class SyncStatusTest {
private static final File storageDir = new File("target" +
java.io.File.separator + "test");
private static final String prefix = "version";
private static final MultiLeaderConfig config = new
MultiLeaderConfig.Builder().build();
+ private static final long CHECK_POINT_GAP = 500;
@Before
public void setUp() throws IOException {
@@ -54,7 +55,8 @@ public class SyncStatusTest {
/** Confirm success from front to back */
@Test
public void sequenceTest() throws InterruptedException {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller =
+ new IndexController(storageDir.getAbsolutePath(), prefix,
CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);
@@ -79,7 +81,8 @@ public class SyncStatusTest {
/** Confirm success from back to front */
@Test
public void reverseTest() throws InterruptedException {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller =
+ new IndexController(storageDir.getAbsolutePath(), prefix,
CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -111,7 +114,8 @@ public class SyncStatusTest {
/** Confirm success first from front to back, then back to front */
@Test
public void mixedTest() throws InterruptedException {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller =
+ new IndexController(storageDir.getAbsolutePath(), prefix,
CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -155,7 +159,8 @@ public class SyncStatusTest {
/** Test Blocking while addNextBatch */
@Test
public void waitTest() throws InterruptedException, ExecutionException {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller =
+ new IndexController(storageDir.getAbsolutePath(), prefix,
CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);