This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test_exp1_no_write
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0729_test_exp1_no_write by
this push:
new c70de09193 fix a potential issue
c70de09193 is described below
commit c70de09193de326a2ea205b6cdb546ee13cf2046
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Fri Aug 5 18:47:58 2022 +0800
fix a potential issue
---
.../consensus/multileader/MultiLeaderServerImpl.java | 9 ++++++++-
.../multileader/logdispatcher/LogDispatcher.java | 20 +++++++++++++++-----
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 973a31272d..791be89917 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -130,7 +130,10 @@ public class MultiLeaderServerImpl {
StepTracker.trace("stateMachineWrite", startTimeAfterLock,
System.nanoTime());
long offerStartTime = System.nanoTime();
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- logDispatcher.offer(indexedConsensusRequest);
+ synchronized (index) {
+ logDispatcher.offer(indexedConsensusRequest);
+ index.incrementAndGet();
+ }
} else {
logger.debug(
"{}: write operation failed. searchIndex: {}. Code: {}",
@@ -230,4 +233,8 @@ public class MultiLeaderServerImpl {
public MultiLeaderConfig getConfig() {
return config;
}
+
+ public AtomicLong getIndexObject() {
+ return index;
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 1ec4dbd490..ac941e6453 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -266,6 +266,7 @@ public class LogDispatcher {
PendingBatch batch;
List<TLogBatch> logBatches = new ArrayList<>();
long startIndex = syncStatus.getNextSendingIndex();
+ long maxIndexWhenBufferedRequestEmpty = startIndex;
logger.debug("[GetBatch] startIndex: {}", startIndex);
long endIndex;
if (bufferedRequest.size() <=
config.getReplication().getMaxRequestPerBatch()) {
@@ -275,9 +276,12 @@ public class LogDispatcher {
impl.getThisNode().getGroupId(),
pendingRequest.size(),
bufferedRequest.size());
- pendingRequest.drainTo(
- bufferedRequest,
- config.getReplication().getMaxRequestPerBatch() -
bufferedRequest.size());
+ synchronized (impl.getIndexObject()) {
+ pendingRequest.drainTo(
+ bufferedRequest,
+ config.getReplication().getMaxRequestPerBatch() -
bufferedRequest.size());
+ maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
+ }
// remove all request that searchIndex < startIndex
Iterator<IndexedConsensusRequest> iterator =
bufferedRequest.iterator();
while (iterator.hasNext()) {
@@ -289,8 +293,13 @@ public class LogDispatcher {
}
}
}
- if (bufferedRequest.isEmpty()) { // only execute this after a restart
- endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1,
logBatches);
+ // This condition will be executed in several scenarios:
+ // 1. restart
+ // 2. The getBatch() is invoked immediately at the moment the
PendingRequests are consumed
+ // up.
+ if (bufferedRequest.isEmpty()) {
+ endIndex =
+ constructBatchFromWAL(startIndex,
maxIndexWhenBufferedRequestEmpty, logBatches);
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug(
"{} : accumulated a {} from wal when empty",
impl.getThisNode().getGroupId(), batch);
@@ -376,6 +385,7 @@ public class LogDispatcher {
currentIndex,
maxIndex,
iteratorIndex));
+ // Even if there is no WAL files, these code won't produce error.
if (iteratorIndex != currentIndex) {
walEntryiterator.skipTo(currentIndex);
iteratorIndex = currentIndex;