This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e5f8a193f83 Add a multiplier to avoid receiver OOM in IoTConsensus
(#16102)
e5f8a193f83 is described below
commit e5f8a193f8388da71c3d24fff32082ec771a196f
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Aug 6 09:27:28 2025 +0800
Add a multiplier to avoid receiver OOM in IoTConsensus (#16102)
---
.../request/DeserializedBatchIndexedConsensusRequest.java | 7 +++++++
.../apache/iotdb/consensus/iot/client/DispatchLogHandler.java | 5 +++++
.../org/apache/iotdb/consensus/iot/logdispatcher/Batch.java | 10 +++++++++-
.../iotdb/consensus/iot/logdispatcher/LogDispatcher.java | 11 +++++++++++
.../iot/service/IoTConsensusRPCServiceProcessor.java | 3 ++-
.../thrift-consensus/src/main/thrift/iotconsensus.thrift | 1 +
6 files changed, 35 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
index 3cb81fa95c7..9cdeaf60c30 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
@@ -29,6 +29,7 @@ public class DeserializedBatchIndexedConsensusRequest
private final long startSyncIndex;
private final long endSyncIndex;
private final List<IConsensusRequest> insertNodes;
+ private long memorySize;
public DeserializedBatchIndexedConsensusRequest(
long startSyncIndex, long endSyncIndex, int size) {
@@ -52,6 +53,7 @@ public class DeserializedBatchIndexedConsensusRequest
public void add(IConsensusRequest insertNode) {
this.insertNodes.add(insertNode);
+ this.memorySize += insertNode.getMemorySize();
}
@Override
@@ -82,4 +84,9 @@ public class DeserializedBatchIndexedConsensusRequest
public ByteBuffer serializeToByteBuffer() {
return null;
}
+
+ @Override
+ public long getMemorySize() {
+ return memorySize;
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index d587e25ee19..379098582b6 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
+import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
import
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
@@ -93,6 +94,10 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
}
completeBatch(batch);
}
+ if (response.isSetReceiverMemSize()) {
+
LogDispatcher.getReceiverMemSizeSum().addAndGet(response.getReceiverMemSize());
+ LogDispatcher.getSenderMemSizeSum().addAndGet(batch.getMemorySize());
+ }
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() -
createTime);
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
index 4e89226c393..72b68ab96ac 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
@@ -63,8 +63,16 @@ public class Batch {
}
public boolean canAccumulate() {
+ // When reading entries from the WAL, the memory size is calculated based
on the serialized
+ // size, which can be significantly smaller than the actual size.
+ // Thus, we add a multiplier to sender's memory size to estimate the
receiver's memory cost.
+ // The multiplier is calculated based on the receiver's feedback.
+ long receiverMemSize = LogDispatcher.getReceiverMemSizeSum().get();
+ long senderMemSize = LogDispatcher.getSenderMemSizeSum().get();
+ double multiplier = senderMemSize > 0 ? (double) receiverMemSize /
senderMemSize : 1.0;
+ multiplier = Math.max(multiplier, 1.0);
return logEntries.size() <
config.getReplication().getMaxLogEntriesNumPerBatch()
- && memorySize < config.getReplication().getMaxSizePerBatch();
+ && ((long) (memorySize * multiplier)) <
config.getReplication().getMaxSizePerBatch();
}
public long getStartIndex() {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 89ece0c00fd..00cd6d1376f 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -69,6 +69,9 @@ public class LogDispatcher {
private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
private final AtomicLong logEntriesFromQueue = new AtomicLong(0);
+ private static final AtomicLong senderMemSizeSum = new AtomicLong(0);
+ private static final AtomicLong receiverMemSizeSum = new AtomicLong(0);
+
public LogDispatcher(
IoTConsensusServerImpl impl,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager)
{
@@ -591,4 +594,12 @@ public class LogDispatcher {
request.getMemorySize()));
}
}
+
+ public static AtomicLong getReceiverMemSizeSum() {
+ return receiverMemSizeSum;
+ }
+
+ public static AtomicLong getSenderMemSizeSum() {
+ return senderMemSizeSum;
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 2bac66738fd..71c14aebaa1 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -129,7 +129,8 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
"execute TSyncLogEntriesReq for {} with result {}",
req.consensusGroupId,
writeStatus.subStatus);
- return new TSyncLogEntriesRes(writeStatus.subStatus);
+ return new TSyncLogEntriesRes(writeStatus.subStatus)
+ .setReceiverMemSize(deserializedRequest.getMemorySize());
}
@Override
diff --git
a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
index dbcffe02ba6..829443f9552 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
@@ -38,6 +38,7 @@ struct TSyncLogEntriesReq {
struct TSyncLogEntriesRes {
1: required list<common.TSStatus> statuses
+ 2: optional i64 receiverMemSize
}
struct TInactivatePeerReq {