This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 00a36dcfd0f Pipe: Added memory control for aligned chunk reader in 
TsFileInsertionEventScanParser & TsFileInsertionEventTableParserTabletIterator 
(#15222) (#15270)
00a36dcfd0f is described below

commit 00a36dcfd0f2f2f211e821d067bb3b15b1aa4c5b
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 7 12:31:55 2025 +0800

    Pipe: Added memory control for aligned chunk reader in 
TsFileInsertionEventScanParser & TsFileInsertionEventTableParserTabletIterator 
(#15222) (#15270)
    
    Co-authored-by: luoluoyuyu <[email protected]>
---
 .../container/scan/TsFileInsertionScanDataContainer.java       | 10 ++++++++--
 .../main/java/org/apache/iotdb/commons/conf/CommonConfig.java  |  9 +++++++++
 .../java/org/apache/iotdb/commons/conf/CommonDescriptor.java   |  5 +++++
 .../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java  |  5 +++++
 4 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index eca344eaeca..9692abcac76 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -66,6 +67,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
   private static final LocalDate EMPTY_DATE = LocalDate.of(1000, 1, 1);
 
+  private static final int PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH =
+      PipeConfig.getInstance().getPipeMaxAlignedSeriesNumInOneBatch();
   private final long startTime;
   private final long endTime;
   private final Filter filter;
@@ -449,13 +452,16 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
                     chunkHeader.getMeasurementID(),
                     (measurement, index) -> Objects.nonNull(index) ? index + 1 
: 0);
 
-            // Emit when encountered non-sequential value chunk
+            // Emit when encountered non-sequential value chunk, or the chunk 
list size exceeds
+            // certain value to avoid OOM
             // Do not record or end current value chunks when there are empty 
chunks
             if (chunkHeader.getDataSize() == 0) {
               break;
             }
             boolean needReturn = false;
-            if (lastIndex >= 0 && valueIndex != lastIndex) {
+            if (lastIndex >= 0
+                && (valueIndex != lastIndex
+                    || valueChunkList.size() >= 
PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH)) {
               needReturn = recordAlignedChunk(valueChunkList, marker);
             }
             lastIndex = valueIndex;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 10e1954f39c..ef289d75c19 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -289,6 +289,7 @@ public class CommonConfig {
   private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
   private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
   private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
+  private int pipeMaxAlignedSeriesNumInOneBatch = 15;
   private long pipeListeningQueueTransferSnapshotThreshold = 1000;
   private int pipeSnapshotExecutionMaxBatchSize = 1000;
   private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
@@ -1293,6 +1294,14 @@ public class CommonConfig {
     this.pipeLeaderCacheMemoryUsagePercentage = 
pipeLeaderCacheMemoryUsagePercentage;
   }
 
+  public int getPipeMaxAlignedSeriesNumInOneBatch() {
+    return pipeMaxAlignedSeriesNumInOneBatch;
+  }
+
+  public void setPipeMaxAlignedSeriesNumInOneBatch(int 
pipeMaxAlignedSeriesNumInOneBatch) {
+    this.pipeMaxAlignedSeriesNumInOneBatch = pipeMaxAlignedSeriesNumInOneBatch;
+  }
+
   public long getPipeListeningQueueTransferSnapshotThreshold() {
     return pipeListeningQueueTransferSnapshotThreshold;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index b31d619ae29..676458299b7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -671,6 +671,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_leader_cache_memory_usage_percentage",
                 
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
+    config.setPipeMaxAlignedSeriesNumInOneBatch(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_max_aligned_series_num_in_one_batch",
+                
String.valueOf(config.getPipeMaxAlignedSeriesNumInOneBatch()))));
     config.setPipeListeningQueueTransferSnapshotThreshold(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 590d9f864c0..e638441046b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -196,6 +196,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
   }
 
+  public int getPipeMaxAlignedSeriesNumInOneBatch() {
+    return COMMON_CONFIG.getPipeMaxAlignedSeriesNumInOneBatch();
+  }
+
   public long getPipeListeningQueueTransferSnapshotThreshold() {
     return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
   }
@@ -448,6 +452,7 @@ public class PipeConfig {
         isPipeConnectorRPCThriftCompressionEnabled());
     LOGGER.info(
         "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
+    LOGGER.info("PipeMaxAlignedSeriesNumInOneBatch: {}", 
getPipeMaxAlignedSeriesNumInOneBatch());
     LOGGER.info(
         "PipeListeningQueueTransferSnapshotThreshold: {}",
         getPipeListeningQueueTransferSnapshotThreshold());

Reply via email to