This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c6d5e29b327 more accurate mermory size (#15713)
c6d5e29b327 is described below
commit c6d5e29b327699b295b1c3302898371076fada95
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Mon Jun 16 09:40:00 2025 +0800
more accurate mermory size (#15713)
---
.../apache/iotdb/consensus/iot/logdispatcher/Batch.java | 17 +++++++----------
.../consensus/iot/logdispatcher/LogDispatcher.java | 9 +++++++--
.../iotdb/consensus/iot/logdispatcher/SyncStatus.java | 6 +++---
.../src/main/thrift/iotconsensus.thrift | 1 +
4 files changed, 18 insertions(+), 15 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
index 7a556c85a04..4e89226c393 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
-import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
@@ -37,7 +36,7 @@ public class Batch {
private long logEntriesNumFromWAL = 0L;
- private long serializedSize;
+ private long memorySize;
// indicates whether this batch has been successfully synchronized to
another node
private boolean synced;
@@ -60,14 +59,12 @@ public class Batch {
if (entry.fromWAL) {
logEntriesNumFromWAL++;
}
- // TODO Maybe we need to add in additional fields for more accurate
calculations
- serializedSize +=
- entry.getData() == null ? 0 :
entry.getData().stream().mapToInt(Buffer::capacity).sum();
+ memorySize += entry.getMemorySize();
}
public boolean canAccumulate() {
return logEntries.size() <
config.getReplication().getMaxLogEntriesNumPerBatch()
- && serializedSize < config.getReplication().getMaxSizePerBatch();
+ && memorySize < config.getReplication().getMaxSizePerBatch();
}
public long getStartIndex() {
@@ -94,8 +91,8 @@ public class Batch {
return logEntries.isEmpty();
}
- public long getSerializedSize() {
- return serializedSize;
+ public long getMemorySize() {
+ return memorySize;
}
public long getLogEntriesNumFromWAL() {
@@ -111,8 +108,8 @@ public class Batch {
+ endIndex
+ ", size="
+ logEntries.size()
- + ", serializedSize="
- + serializedSize
+ + ", memorySize="
+ + memorySize
+ '}';
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 2d4e7cd5e01..c3a0665be6a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -567,7 +567,8 @@ public class LogDispatcher {
data.buildSerializedRequests();
// construct request from wal
logBatches.addTLogEntry(
- new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(),
true));
+ new TLogEntry(
+ data.getSerializedRequests(), data.getSearchIndex(), true,
data.getMemorySize()));
}
// In the case of corrupt Data, we return true so that we can send a
batch as soon as
// possible, avoiding potential duplication
@@ -577,7 +578,11 @@ public class LogDispatcher {
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, Batch logBatches) {
logBatches.addTLogEntry(
- new TLogEntry(request.getSerializedRequests(),
request.getSearchIndex(), false));
+ new TLogEntry(
+ request.getSerializedRequests(),
+ request.getSearchIndex(),
+ false,
+ request.getMemorySize()));
}
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index e11b6302114..fe00939050e 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -45,7 +45,7 @@ public class SyncStatus {
*/
public synchronized void addNextBatch(Batch batch) throws
InterruptedException {
while (pendingBatches.size() >=
config.getReplication().getMaxPendingBatchesNum()
- || !iotConsensusMemoryManager.reserve(batch.getSerializedSize(),
false)) {
+ || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) {
wait();
}
pendingBatches.add(batch);
@@ -64,7 +64,7 @@ public class SyncStatus {
while (current.isSynced()) {
controller.update(current.getEndIndex(), false);
iterator.remove();
- iotConsensusMemoryManager.free(current.getSerializedSize(), false);
+ iotConsensusMemoryManager.free(current.getMemorySize(), false);
if (iterator.hasNext()) {
current = iterator.next();
} else {
@@ -79,7 +79,7 @@ public class SyncStatus {
public synchronized void free() {
long size = 0;
for (Batch pendingBatch : pendingBatches) {
- size += pendingBatch.getSerializedSize();
+ size += pendingBatch.getMemorySize();
}
pendingBatches.clear();
controller.update(0L, true);
diff --git
a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
index d0b4808977e..dbcffe02ba6 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
@@ -26,6 +26,7 @@ struct TLogEntry {
1: required list<binary> data
2: required i64 searchIndex
3: required bool fromWAL
+ 4: required i64 memorySize
}
struct TSyncLogEntriesReq {