This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/multi_leader_memory_pendingBatch_control by this push:
new dd0b79582d refine dispatcher logic
dd0b79582d is described below
commit dd0b79582dc92a1c65348d693fcd6d9a295d2e8c
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Mon Nov 14 21:44:35 2022 +0800
refine dispatcher logic
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../multileader/logdispatcher/LogDispatcher.java | 52 +++++++++++-----------
.../multileader/logdispatcher/SyncStatus.java | 2 +-
2 files changed, 27 insertions(+), 27 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index e2c44c2570..8c7a443d3c 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -349,47 +349,47 @@ public class LogDispatcher {
public PendingBatch getBatch() {
long startIndex = syncStatus.getNextSendingIndex();
- long maxIndexWhenBufferedRequestEmpty = startIndex;
- logger.debug("[GetBatch] startIndex: {}", startIndex);
- if (bufferedRequest.size() <=
config.getReplication().getMaxRequestNumPerBatch()) {
- // Use drainTo instead of poll to reduce lock overhead
+ long maxIndex;
+ synchronized (impl.getIndexObject()) {
+ maxIndex = impl.getIndex() + 1;
logger.debug(
- "{} : pendingRequest Size: {}, bufferedRequest size: {}",
+ "{}: startIndex: {}, maxIndex: {}, pendingRequest size: {},
bufferedRequest size: {}",
impl.getThisNode().getGroupId(),
+ startIndex,
+ maxIndex,
getPendingRequestSize(),
bufferedRequest.size());
- synchronized (impl.getIndexObject()) {
- pendingRequest.drainTo(
- bufferedRequest,
- config.getReplication().getMaxRequestNumPerBatch() -
bufferedRequest.size());
- maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
- }
- // remove all request that searchIndex < startIndex
- Iterator<IndexedConsensusRequest> iterator =
bufferedRequest.iterator();
- while (iterator.hasNext()) {
- IndexedConsensusRequest request = iterator.next();
- if (request.getSearchIndex() < startIndex) {
- iterator.remove();
- releaseReservedMemory(request);
- } else {
- break;
- }
+ // Use drainTo instead of poll to reduce lock overhead
+ pendingRequest.drainTo(
+ bufferedRequest,
+ config.getReplication().getMaxRequestNumPerBatch() -
bufferedRequest.size());
+ }
+ // remove all request that searchIndex < startIndex
+ Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+ while (iterator.hasNext()) {
+ IndexedConsensusRequest request = iterator.next();
+ if (request.getSearchIndex() < startIndex) {
+ iterator.remove();
+ releaseReservedMemory(request);
+ } else {
+ break;
}
}
+
PendingBatch batches = new PendingBatch(config);
// This condition will be executed in several scenarios:
// 1. restart
// 2. The getBatch() is invoked immediately at the moment the
PendingRequests are consumed
// up. To prevent inconsistency here, we use the synchronized logic when
calculate value of
- // `maxIndexWhenBufferedRequestEmpty`
+ // `maxIndex`
if (bufferedRequest.isEmpty()) {
- constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty,
batches);
+ constructBatchFromWAL(startIndex, maxIndex, batches);
batches.buildIndex();
logger.debug(
"{} : accumulated a {} from wal when empty",
impl.getThisNode().getGroupId(), batches);
} else {
// Notice that prev searchIndex >= startIndex
- Iterator<IndexedConsensusRequest> iterator =
bufferedRequest.iterator();
+ iterator = bufferedRequest.iterator();
IndexedConsensusRequest prev = iterator.next();
// Prevents gap between logs. For example, some requests are not
written into the queue when
@@ -481,14 +481,14 @@ public class LogDispatcher {
logger.warn("wait for next WAL entry is interrupted");
}
IndexedConsensusRequest data = walEntryIterator.next();
- if (targetIndex > data.getSearchIndex()) {
+ if (data.getSearchIndex() < targetIndex) {
// if the index of request is smaller than currentIndex, then
continue
logger.warn(
"search for one Entry which index is {}, but find a smaller one,
index : {}",
targetIndex,
data.getSearchIndex());
continue;
- } else if (targetIndex < data.getSearchIndex()) {
+ } else if (data.getSearchIndex() > targetIndex) {
logger.warn(
"search for one Entry which index is {}, but find a larger one,
index : {}",
targetIndex,
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index e0ec7d4023..25af8b8fcd 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -80,7 +80,7 @@ public class SyncStatus {
public void free() {
long size = 0;
for (PendingBatch pendingBatch : pendingBatches) {
- size = pendingBatch.getSerializedSize();
+ size += pendingBatch.getSerializedSize();
}
pendingBatches.clear();
multiLeaderMemoryManager.free(size);