This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 00a36dcfd0f Pipe: Added memory control for aligned chunk reader in
TsFileInsertionEventScanParser & TsFileInsertionEventTableParserTabletIterator
(#15222) (#15270)
00a36dcfd0f is described below
commit 00a36dcfd0f2f2f211e821d067bb3b15b1aa4c5b
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 7 12:31:55 2025 +0800
Pipe: Added memory control for aligned chunk reader in
TsFileInsertionEventScanParser & TsFileInsertionEventTableParserTabletIterator
(#15222) (#15270)
Co-authored-by: luoluoyuyu <[email protected]>
---
.../container/scan/TsFileInsertionScanDataContainer.java | 10 ++++++++--
.../main/java/org/apache/iotdb/commons/conf/CommonConfig.java | 9 +++++++++
.../java/org/apache/iotdb/commons/conf/CommonDescriptor.java | 5 +++++
.../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java | 5 +++++
4 files changed, 27 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index eca344eaeca..9692abcac76 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -66,6 +67,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private static final LocalDate EMPTY_DATE = LocalDate.of(1000, 1, 1);
+ private static final int PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH =
+ PipeConfig.getInstance().getPipeMaxAlignedSeriesNumInOneBatch();
private final long startTime;
private final long endTime;
private final Filter filter;
@@ -449,13 +452,16 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
chunkHeader.getMeasurementID(),
(measurement, index) -> Objects.nonNull(index) ? index + 1
: 0);
- // Emit when encountered non-sequential value chunk
+ // Emit when encountered non-sequential value chunk, or the chunk
list size exceeds
+ // certain value to avoid OOM
// Do not record or end current value chunks when there are empty
chunks
if (chunkHeader.getDataSize() == 0) {
break;
}
boolean needReturn = false;
- if (lastIndex >= 0 && valueIndex != lastIndex) {
+ if (lastIndex >= 0
+ && (valueIndex != lastIndex
+ || valueChunkList.size() >=
PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH)) {
needReturn = recordAlignedChunk(valueChunkList, marker);
}
lastIndex = valueIndex;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 10e1954f39c..ef289d75c19 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -289,6 +289,7 @@ public class CommonConfig {
private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
+ private int pipeMaxAlignedSeriesNumInOneBatch = 15;
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int pipeSnapshotExecutionMaxBatchSize = 1000;
private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
@@ -1293,6 +1294,14 @@ public class CommonConfig {
this.pipeLeaderCacheMemoryUsagePercentage =
pipeLeaderCacheMemoryUsagePercentage;
}
+ public int getPipeMaxAlignedSeriesNumInOneBatch() {
+ return pipeMaxAlignedSeriesNumInOneBatch;
+ }
+
+ public void setPipeMaxAlignedSeriesNumInOneBatch(int
pipeMaxAlignedSeriesNumInOneBatch) {
+ this.pipeMaxAlignedSeriesNumInOneBatch = pipeMaxAlignedSeriesNumInOneBatch;
+ }
+
public long getPipeListeningQueueTransferSnapshotThreshold() {
return pipeListeningQueueTransferSnapshotThreshold;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index b31d619ae29..676458299b7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -671,6 +671,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_leader_cache_memory_usage_percentage",
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
+ config.setPipeMaxAlignedSeriesNumInOneBatch(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_max_aligned_series_num_in_one_batch",
+
String.valueOf(config.getPipeMaxAlignedSeriesNumInOneBatch()))));
config.setPipeListeningQueueTransferSnapshotThreshold(
Long.parseLong(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 590d9f864c0..e638441046b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -196,6 +196,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
}
+ public int getPipeMaxAlignedSeriesNumInOneBatch() {
+ return COMMON_CONFIG.getPipeMaxAlignedSeriesNumInOneBatch();
+ }
+
public long getPipeListeningQueueTransferSnapshotThreshold() {
return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
}
@@ -448,6 +452,7 @@ public class PipeConfig {
isPipeConnectorRPCThriftCompressionEnabled());
LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
+ LOGGER.info("PipeMaxAlignedSeriesNumInOneBatch: {}",
getPipeMaxAlignedSeriesNumInOneBatch());
LOGGER.info(
"PipeListeningQueueTransferSnapshotThreshold: {}",
getPipeListeningQueueTransferSnapshotThreshold());