This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_optimize_reader
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_optimize_reader by this
push:
new 52cce30a05 fix some issues when constructing batch
52cce30a05 is described below
commit 52cce30a058c00bdd119c773eecb2b094a13a0ea
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Sep 13 18:01:50 2022 +0800
fix some issues when constructing batch
---
.../multileader/logdispatcher/LogDispatcher.java | 34 +++++++++-------------
1 file changed, 13 insertions(+), 21 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index abd491dc5d..2973e4e3ce 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -346,14 +346,13 @@ public class LogDispatcher {
currentIndex,
maxIndex,
iteratorIndex));
+ long targetIndex = currentIndex;
// Even if there is no WAL files, these code won't produce error.
- if (iteratorIndex != currentIndex) {
- walEntryiterator.skipTo(currentIndex);
- iteratorIndex = currentIndex;
- }
- while (currentIndex < maxIndex
+ walEntryiterator.skipTo(targetIndex);
+
+ while (targetIndex < maxIndex
&& logBatches.size() <
config.getReplication().getMaxRequestPerBatch()) {
- logger.debug("construct from WAL for one Entry, index : {}",
currentIndex);
+ logger.debug("construct from WAL for one Entry, index : {}",
targetIndex);
try {
walEntryiterator.waitForNextReady();
} catch (InterruptedException e) {
@@ -361,38 +360,31 @@ public class LogDispatcher {
logger.warn("wait for next WAL entry is interrupted");
}
IndexedConsensusRequest data = walEntryiterator.next();
- if (currentIndex > data.getSearchIndex()) {
+ if (targetIndex > data.getSearchIndex()) {
// if the index of request is smaller than currentIndex, then
continue
logger.warn(
"search for one Entry which index is {}, but find a smaller one,
index : {}",
- currentIndex,
+ targetIndex,
data.getSearchIndex());
continue;
- } else if (currentIndex < data.getSearchIndex()) {
+ } else if (targetIndex < data.getSearchIndex()) {
logger.warn(
"search for one Entry which index is {}, but find a larger one,
index : {}",
- currentIndex,
+ targetIndex,
data.getSearchIndex());
if (data.getSearchIndex() >= maxIndex) {
// if the index of request is larger than maxIndex, then finish
- walEntryiterator.skipTo(currentIndex);
break;
}
- // if the index of request is larger than currentIndex, and smaller
than maxIndex, then
- // skip to index
- currentIndex = data.getSearchIndex();
- walEntryiterator.skipTo(currentIndex);
- iteratorIndex = currentIndex;
}
+ targetIndex = data.getSearchIndex() + 1;
// construct request from wal
for (IConsensusRequest innerRequest : data.getRequests()) {
- logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(),
currentIndex, true));
- }
- if (currentIndex == maxIndex - 1) {
- break;
+ logBatches.add(
+ new TLogBatch(innerRequest.serializeToByteBuffer(),
data.getSearchIndex(), true));
}
}
- return currentIndex;
+ return logBatches.size() > 0 ? logBatches.get(0).searchIndex :
currentIndex;
}
private void constructBatchIndexedFromConsensusRequest(