This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dc583e9c59 Pipe: Optimize TsFile parsing logic (#16173)
0dc583e9c59 is described below

commit 0dc583e9c59433845d715351b6fa746a92abb5be
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Aug 18 14:39:51 2025 +0800

    Pipe: Optimize TsFile parsing logic (#16173)
    
    * Pipe: Optimize TsFile parsing logic
    
    * update
---
 .../tsfile/parser/TsFileInsertionEventParser.java  |  11 +-
 .../query/TsFileInsertionEventQueryParser.java     | 215 +++++++++---------
 ...ileInsertionEventQueryParserTabletIterator.java |  14 +-
 .../scan/TsFileInsertionEventScanParser.java       | 139 ++++++------
 .../table/TsFileInsertionEventTableParser.java     | 247 +++++++++++----------
 ...ileInsertionEventTableParserTabletIterator.java |  38 ++--
 6 files changed, 354 insertions(+), 310 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index 57616533b80..358103175fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.event.common.tsfile.parser;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
@@ -59,6 +60,8 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
 
   protected TsFileSequenceReader tsFileSequenceReader;
 
+  protected Iterable<TabletInsertionEvent> tabletInsertionIterable;
+
   protected TsFileInsertionEventParser(
       final String pipeName,
       final long creationTime,
@@ -83,9 +86,10 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
     this.pipeTaskMeta = pipeTaskMeta;
     this.sourceEvent = sourceEvent;
 
-    // Allocate empty memory block, will be resized later.
     this.allocatedMemoryBlockForTablet =
-        
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+        PipeDataNodeResourceManager.memory()
+            .forceAllocateForTabletWithRetry(
+                
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
   }
 
   /**
@@ -95,6 +99,9 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
 
   @Override
   public void close() {
+
+    tabletInsertionIterable = null;
+
     try {
       if (pipeName != null && !timeUsageReported) {
         PipeTsFileToTabletsMetrics.getInstance()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
index ba07299d66b..d61f7a791ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
@@ -277,111 +277,118 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
 
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
-    return () ->
-        new Iterator<TabletInsertionEvent>() {
-
-          private TsFileInsertionEventQueryParserTabletIterator tabletIterator 
= null;
-
-          @Override
-          public boolean hasNext() {
-            while (tabletIterator == null || !tabletIterator.hasNext()) {
-              if (!deviceMeasurementsMapIterator.hasNext()) {
-                close();
-                return false;
-              }
-
-              final Map.Entry<IDeviceID, List<String>> entry = 
deviceMeasurementsMapIterator.next();
-
-              try {
-                tabletIterator =
-                    new TsFileInsertionEventQueryParserTabletIterator(
-                        tsFileReader,
-                        measurementDataTypeMap,
-                        entry.getKey(),
-                        entry.getValue(),
-                        timeFilterExpression,
-                        allocatedMemoryBlockForTablet);
-              } catch (final Exception e) {
-                close();
-                throw new PipeException("failed to create 
TsFileInsertionDataTabletIterator", e);
-              }
-            }
-
-            return true;
-          }
+    if (tabletInsertionIterable == null) {
+      tabletInsertionIterable =
+          () ->
+              new Iterator<TabletInsertionEvent>() {
+
+                private TsFileInsertionEventQueryParserTabletIterator 
tabletIterator = null;
+
+                @Override
+                public boolean hasNext() {
+                  while (tabletIterator == null || !tabletIterator.hasNext()) {
+                    if (!deviceMeasurementsMapIterator.hasNext()) {
+                      close();
+                      return false;
+                    }
+
+                    final Map.Entry<IDeviceID, List<String>> entry =
+                        deviceMeasurementsMapIterator.next();
+
+                    try {
+                      tabletIterator =
+                          new TsFileInsertionEventQueryParserTabletIterator(
+                              tsFileReader,
+                              measurementDataTypeMap,
+                              entry.getKey(),
+                              entry.getValue(),
+                              timeFilterExpression,
+                              allocatedMemoryBlockForTablet);
+                    } catch (final Exception e) {
+                      close();
+                      throw new PipeException(
+                          "failed to create 
TsFileInsertionDataTabletIterator", e);
+                    }
+                  }
+
+                  return true;
+                }
+
+                @Override
+                public TabletInsertionEvent next() {
+                  if (!hasNext()) {
+                    close();
+                    throw new NoSuchElementException();
+                  }
+
+                  final Tablet tablet = tabletIterator.next();
+                  final boolean isAligned =
+                      deviceIsAlignedMap.getOrDefault(
+                          
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);
+
+                  final TabletInsertionEvent next;
+                  if (!hasNext()) {
+                    next =
+                        sourceEvent == null
+                            ? new PipeRawTabletInsertionEvent(
+                                null,
+                                null,
+                                null,
+                                null,
+                                tablet,
+                                isAligned,
+                                null,
+                                0,
+                                pipeTaskMeta,
+                                sourceEvent,
+                                true)
+                            : new PipeRawTabletInsertionEvent(
+                                sourceEvent.getRawIsTableModelEvent(),
+                                
sourceEvent.getSourceDatabaseNameFromDataRegion(),
+                                sourceEvent.getRawTableModelDataBase(),
+                                sourceEvent.getRawTreeModelDataBase(),
+                                tablet,
+                                isAligned,
+                                sourceEvent.getPipeName(),
+                                sourceEvent.getCreationTime(),
+                                pipeTaskMeta,
+                                sourceEvent,
+                                true);
+                    close();
+                  } else {
+                    next =
+                        sourceEvent == null
+                            ? new PipeRawTabletInsertionEvent(
+                                null,
+                                null,
+                                null,
+                                null,
+                                tablet,
+                                isAligned,
+                                null,
+                                0,
+                                pipeTaskMeta,
+                                sourceEvent,
+                                false)
+                            : new PipeRawTabletInsertionEvent(
+                                sourceEvent.getRawIsTableModelEvent(),
+                                
sourceEvent.getSourceDatabaseNameFromDataRegion(),
+                                sourceEvent.getRawTableModelDataBase(),
+                                sourceEvent.getRawTreeModelDataBase(),
+                                tablet,
+                                isAligned,
+                                sourceEvent.getPipeName(),
+                                sourceEvent.getCreationTime(),
+                                pipeTaskMeta,
+                                sourceEvent,
+                                false);
+                  }
+                  return next;
+                }
+              };
+    }
 
-          @Override
-          public TabletInsertionEvent next() {
-            if (!hasNext()) {
-              close();
-              throw new NoSuchElementException();
-            }
-
-            final Tablet tablet = tabletIterator.next();
-            final boolean isAligned =
-                deviceIsAlignedMap.getOrDefault(
-                    
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);
-
-            final TabletInsertionEvent next;
-            if (!hasNext()) {
-              next =
-                  sourceEvent == null
-                      ? new PipeRawTabletInsertionEvent(
-                          null,
-                          null,
-                          null,
-                          null,
-                          tablet,
-                          isAligned,
-                          null,
-                          0,
-                          pipeTaskMeta,
-                          sourceEvent,
-                          true)
-                      : new PipeRawTabletInsertionEvent(
-                          sourceEvent.getRawIsTableModelEvent(),
-                          sourceEvent.getSourceDatabaseNameFromDataRegion(),
-                          sourceEvent.getRawTableModelDataBase(),
-                          sourceEvent.getRawTreeModelDataBase(),
-                          tablet,
-                          isAligned,
-                          sourceEvent.getPipeName(),
-                          sourceEvent.getCreationTime(),
-                          pipeTaskMeta,
-                          sourceEvent,
-                          true);
-              close();
-            } else {
-              next =
-                  sourceEvent == null
-                      ? new PipeRawTabletInsertionEvent(
-                          null,
-                          null,
-                          null,
-                          null,
-                          tablet,
-                          isAligned,
-                          null,
-                          0,
-                          pipeTaskMeta,
-                          sourceEvent,
-                          false)
-                      : new PipeRawTabletInsertionEvent(
-                          sourceEvent.getRawIsTableModelEvent(),
-                          sourceEvent.getSourceDatabaseNameFromDataRegion(),
-                          sourceEvent.getRawTableModelDataBase(),
-                          sourceEvent.getRawTreeModelDataBase(),
-                          tablet,
-                          isAligned,
-                          sourceEvent.getPipeName(),
-                          sourceEvent.getCreationTime(),
-                          pipeTaskMeta,
-                          sourceEvent,
-                          false);
-            }
-            return next;
-          }
-        };
+    return tabletInsertionIterable;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
index b63fc682a1f..7c32321186e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
@@ -62,6 +62,8 @@ public class TsFileInsertionEventQueryParserTabletIterator 
implements Iterator<T
 
   private final PipeMemoryBlock allocatedBlockForTablet;
 
+  private RowRecord rowRecord;
+
   TsFileInsertionEventQueryParserTabletIterator(
       final TsFileReader tsFileReader,
       final Map<String, TSDataType> measurementDataTypeMap,
@@ -135,16 +137,15 @@ public class 
TsFileInsertionEventQueryParserTabletIterator implements Iterator<T
               // Used for tree model
               deviceId.toString(), schemas, 1);
       tablet.initBitMaps();
-      // Ignore the memory cost of tablet
-      
PipeDataNodeResourceManager.memory().forceResize(allocatedBlockForTablet, 0);
       return tablet;
     }
 
     boolean isFirstRow = true;
     while (queryDataSet.hasNext()) {
-      final RowRecord rowRecord = queryDataSet.next();
+      final RowRecord rowRecord = this.rowRecord != null ? this.rowRecord : 
queryDataSet.next();
       if (isFirstRow) {
         // Calculate row count and memory size of the tablet based on the 
first row
+        this.rowRecord = rowRecord; // Save the first row for later use
         Pair<Integer, Integer> rowCountAndMemorySize =
             PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(rowRecord);
         tablet =
@@ -152,8 +153,11 @@ public class TsFileInsertionEventQueryParserTabletIterator 
implements Iterator<T
                 // Used for tree model
                 deviceId.toString(), schemas, rowCountAndMemorySize.getLeft());
         tablet.initBitMaps();
-        PipeDataNodeResourceManager.memory()
-            .forceResize(allocatedBlockForTablet, 
rowCountAndMemorySize.getRight());
+        if (allocatedBlockForTablet.getMemoryUsageInBytes() < 
rowCountAndMemorySize.getRight()) {
+          PipeDataNodeResourceManager.memory()
+              .forceResize(allocatedBlockForTablet, 
rowCountAndMemorySize.getRight());
+        }
+        this.rowRecord = null; // Clear the saved first row
         isFirstRow = false;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 9dcbc01bd72..064a2f22279 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -108,7 +108,9 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
 
     // Allocate empty memory block, will be resized later.
     this.allocatedMemoryBlockForBatchData =
-        
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+        PipeDataNodeResourceManager.memory()
+            .forceAllocateForTabletWithRetry(
+                
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
 
     try {
       tsFileSequenceReader = new 
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
@@ -134,61 +136,67 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
 
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
-    return () ->
-        new Iterator<TabletInsertionEvent>() {
-
-          @Override
-          public boolean hasNext() {
-            return Objects.nonNull(chunkReader);
-          }
-
-          @Override
-          public TabletInsertionEvent next() {
-            if (!hasNext()) {
-              close();
-              throw new NoSuchElementException();
-            }
-
-            // currentIsAligned is initialized when 
TsFileInsertionEventScanParser is constructed.
-            // When the getNextTablet function is called, currentIsAligned may 
be updated, causing
-            // the currentIsAligned information to be inconsistent with the 
current Tablet
-            // information.
-            final boolean isAligned = currentIsAligned;
-            final Tablet tablet = getNextTablet();
-            final boolean hasNext = hasNext();
-            try {
-              return sourceEvent == null
-                  ? new PipeRawTabletInsertionEvent(
-                      null,
-                      null,
-                      null,
-                      null,
-                      tablet,
-                      isAligned,
-                      null,
-                      0,
-                      pipeTaskMeta,
-                      sourceEvent,
-                      !hasNext)
-                  : new PipeRawTabletInsertionEvent(
-                      sourceEvent.getRawIsTableModelEvent(),
-                      sourceEvent.getSourceDatabaseNameFromDataRegion(),
-                      sourceEvent.getRawTableModelDataBase(),
-                      sourceEvent.getRawTreeModelDataBase(),
-                      tablet,
-                      isAligned,
-                      sourceEvent.getPipeName(),
-                      sourceEvent.getCreationTime(),
-                      pipeTaskMeta,
-                      sourceEvent,
-                      !hasNext);
-            } finally {
-              if (!hasNext) {
-                close();
-              }
-            }
-          }
-        };
+    if (tabletInsertionIterable == null) {
+      tabletInsertionIterable =
+          () ->
+              new Iterator<TabletInsertionEvent>() {
+
+                @Override
+                public boolean hasNext() {
+                  return Objects.nonNull(chunkReader);
+                }
+
+                @Override
+                public TabletInsertionEvent next() {
+                  if (!hasNext()) {
+                    close();
+                    throw new NoSuchElementException();
+                  }
+
+                  // currentIsAligned is initialized when 
TsFileInsertionEventScanParser is
+                  // constructed.
+                  // When the getNextTablet function is called, 
currentIsAligned may be updated,
+                  // causing
+                  // the currentIsAligned information to be inconsistent with 
the current Tablet
+                  // information.
+                  final boolean isAligned = currentIsAligned;
+                  final Tablet tablet = getNextTablet();
+                  final boolean hasNext = hasNext();
+                  try {
+                    return sourceEvent == null
+                        ? new PipeRawTabletInsertionEvent(
+                            null,
+                            null,
+                            null,
+                            null,
+                            tablet,
+                            isAligned,
+                            null,
+                            0,
+                            pipeTaskMeta,
+                            sourceEvent,
+                            !hasNext)
+                        : new PipeRawTabletInsertionEvent(
+                            sourceEvent.getRawIsTableModelEvent(),
+                            sourceEvent.getSourceDatabaseNameFromDataRegion(),
+                            sourceEvent.getRawTableModelDataBase(),
+                            sourceEvent.getRawTreeModelDataBase(),
+                            tablet,
+                            isAligned,
+                            sourceEvent.getPipeName(),
+                            sourceEvent.getCreationTime(),
+                            pipeTaskMeta,
+                            sourceEvent,
+                            !hasNext);
+                  } finally {
+                    if (!hasNext) {
+                      close();
+                    }
+                  }
+                }
+              };
+    }
+    return tabletInsertionIterable;
   }
 
   public Iterable<Pair<Tablet, Boolean>> toTabletWithIsAligneds() {
@@ -231,8 +239,6 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
       if (!data.hasCurrent()) {
         tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
         tablet.initBitMaps();
-        // Ignore the memory cost of tablet
-        
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 
0);
         return tablet;
       }
 
@@ -248,8 +254,11 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                 new Tablet(
                     currentDevice.toString(), currentMeasurements, 
rowCountAndMemorySize.getLeft());
             tablet.initBitMaps();
-            PipeDataNodeResourceManager.memory()
-                .forceResize(allocatedMemoryBlockForTablet, 
rowCountAndMemorySize.getRight());
+            if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes()
+                < rowCountAndMemorySize.getRight()) {
+              PipeDataNodeResourceManager.memory()
+                  .forceResize(allocatedMemoryBlockForTablet, 
rowCountAndMemorySize.getRight());
+            }
             isFirstRow = false;
           }
 
@@ -272,8 +281,6 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
       if (tablet == null) {
         tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
         tablet.initBitMaps();
-        // Ignore the memory cost of tablet
-        
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 
0);
       }
 
       // Switch chunk reader iff current chunk is all consumed
@@ -300,10 +307,10 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
 
       do {
         data = chunkReader.nextPageData();
-        PipeDataNodeResourceManager.memory()
-            .forceResize(
-                allocatedMemoryBlockForBatchData,
-                PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data));
+        long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
+        if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
+          
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForBatchData,
 size);
+        }
       } while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
     } while (!data.hasCurrent());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 8011f9c3830..1ed755f5660 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -66,14 +67,15 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
     super(pipeName, creationTime, null, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
 
     try {
+      long tableSize = 
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
       this.allocatedMemoryBlockForChunk =
-          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
       this.allocatedMemoryBlockForBatchData =
-          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
       this.allocatedMemoryBlockForChunkMeta =
-          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
       this.allocatedMemoryBlockForTableSchemas =
-          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
 
       this.startTime = startTime;
       this.endTime = endTime;
@@ -101,122 +103,127 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
 
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
-    return () ->
-        new Iterator<TabletInsertionEvent>() {
-
-          private TsFileInsertionEventTableParserTabletIterator tabletIterator;
-
-          @Override
-          public boolean hasNext() {
-            try {
-              if (tabletIterator == null) {
-                tabletIterator =
-                    new TsFileInsertionEventTableParserTabletIterator(
-                        tsFileSequenceReader,
-                        entry ->
-                            (Objects.isNull(tablePattern)
-                                    || 
tablePattern.matchesTable(entry.getKey()))
-                                && hasTablePrivilege(entry.getKey()),
-                        allocatedMemoryBlockForTablet,
-                        allocatedMemoryBlockForBatchData,
-                        allocatedMemoryBlockForChunk,
-                        allocatedMemoryBlockForChunkMeta,
-                        allocatedMemoryBlockForTableSchemas,
-                        startTime,
-                        endTime);
-              }
-              if (!tabletIterator.hasNext()) {
-                close();
-                return false;
-              }
-              return true;
-            } catch (Exception e) {
-              close();
-              throw new PipeException("Error while parsing tsfile insertion 
event", e);
-            }
-          }
-
-          private boolean hasTablePrivilege(final String tableName) {
-            return Objects.isNull(userName)
-                || Objects.isNull(sourceEvent)
-                || Objects.isNull(sourceEvent.getTableModelDatabaseName())
-                || Coordinator.getInstance()
-                    .getAccessControl()
-                    .checkCanSelectFromTable4Pipe(
-                        userName,
-                        new QualifiedObjectName(
-                            sourceEvent.getTableModelDatabaseName(), 
tableName));
-          }
-
-          @Override
-          public TabletInsertionEvent next() {
-            if (!hasNext()) {
-              close();
-              throw new NoSuchElementException();
-            }
-
-            final Tablet tablet = tabletIterator.next();
-
-            final TabletInsertionEvent next;
-            if (!hasNext()) {
-              next =
-                  sourceEvent == null
-                      ? new PipeRawTabletInsertionEvent(
-                          Boolean.TRUE,
-                          null,
-                          null,
-                          null,
-                          tablet,
-                          true,
-                          null,
-                          0,
-                          pipeTaskMeta,
-                          sourceEvent,
-                          true)
-                      : new PipeRawTabletInsertionEvent(
-                          Boolean.TRUE,
-                          sourceEvent.getSourceDatabaseNameFromDataRegion(),
-                          sourceEvent.getRawTableModelDataBase(),
-                          sourceEvent.getRawTreeModelDataBase(),
-                          tablet,
-                          true,
-                          sourceEvent.getPipeName(),
-                          sourceEvent.getCreationTime(),
-                          pipeTaskMeta,
-                          sourceEvent,
-                          true);
-              close();
-            } else {
-              next =
-                  sourceEvent == null
-                      ? new PipeRawTabletInsertionEvent(
-                          Boolean.TRUE,
-                          null,
-                          null,
-                          null,
-                          tablet,
-                          true,
-                          null,
-                          0,
-                          pipeTaskMeta,
-                          sourceEvent,
-                          false)
-                      : new PipeRawTabletInsertionEvent(
-                          Boolean.TRUE,
-                          sourceEvent.getSourceDatabaseNameFromDataRegion(),
-                          sourceEvent.getRawTableModelDataBase(),
-                          sourceEvent.getRawTreeModelDataBase(),
-                          tablet,
-                          true,
-                          sourceEvent.getPipeName(),
-                          sourceEvent.getCreationTime(),
-                          pipeTaskMeta,
-                          sourceEvent,
-                          false);
-            }
-            return next;
-          }
-        };
+    if (tabletInsertionIterable == null) {
+      tabletInsertionIterable =
+          () ->
+              new Iterator<TabletInsertionEvent>() {
+
+                private TsFileInsertionEventTableParserTabletIterator 
tabletIterator;
+
+                @Override
+                public boolean hasNext() {
+                  try {
+                    if (tabletIterator == null) {
+                      tabletIterator =
+                          new TsFileInsertionEventTableParserTabletIterator(
+                              tsFileSequenceReader,
+                              entry ->
+                                  (Objects.isNull(tablePattern)
+                                          || 
tablePattern.matchesTable(entry.getKey()))
+                                      && hasTablePrivilege(entry.getKey()),
+                              allocatedMemoryBlockForTablet,
+                              allocatedMemoryBlockForBatchData,
+                              allocatedMemoryBlockForChunk,
+                              allocatedMemoryBlockForChunkMeta,
+                              allocatedMemoryBlockForTableSchemas,
+                              startTime,
+                              endTime);
+                    }
+                    if (!tabletIterator.hasNext()) {
+                      close();
+                      return false;
+                    }
+                    return true;
+                  } catch (Exception e) {
+                    close();
+                    throw new PipeException("Error while parsing tsfile 
insertion event", e);
+                  }
+                }
+
+                private boolean hasTablePrivilege(final String tableName) {
+                  return Objects.isNull(userName)
+                      || Objects.isNull(sourceEvent)
+                      || 
Objects.isNull(sourceEvent.getTableModelDatabaseName())
+                      || Coordinator.getInstance()
+                          .getAccessControl()
+                          .checkCanSelectFromTable4Pipe(
+                              userName,
+                              new QualifiedObjectName(
+                                  sourceEvent.getTableModelDatabaseName(), 
tableName));
+                }
+
+                @Override
+                public TabletInsertionEvent next() {
+                  if (!hasNext()) {
+                    close();
+                    throw new NoSuchElementException();
+                  }
+
+                  final Tablet tablet = tabletIterator.next();
+
+                  final TabletInsertionEvent next;
+                  if (!hasNext()) {
+                    next =
+                        sourceEvent == null
+                            ? new PipeRawTabletInsertionEvent(
+                                Boolean.TRUE,
+                                null,
+                                null,
+                                null,
+                                tablet,
+                                true,
+                                null,
+                                0,
+                                pipeTaskMeta,
+                                sourceEvent,
+                                true)
+                            : new PipeRawTabletInsertionEvent(
+                                Boolean.TRUE,
+                                
sourceEvent.getSourceDatabaseNameFromDataRegion(),
+                                sourceEvent.getRawTableModelDataBase(),
+                                sourceEvent.getRawTreeModelDataBase(),
+                                tablet,
+                                true,
+                                sourceEvent.getPipeName(),
+                                sourceEvent.getCreationTime(),
+                                pipeTaskMeta,
+                                sourceEvent,
+                                true);
+                    close();
+                  } else {
+                    next =
+                        sourceEvent == null
+                            ? new PipeRawTabletInsertionEvent(
+                                Boolean.TRUE,
+                                null,
+                                null,
+                                null,
+                                tablet,
+                                true,
+                                null,
+                                0,
+                                pipeTaskMeta,
+                                sourceEvent,
+                                false)
+                            : new PipeRawTabletInsertionEvent(
+                                Boolean.TRUE,
+                                
sourceEvent.getSourceDatabaseNameFromDataRegion(),
+                                sourceEvent.getRawTableModelDataBase(),
+                                sourceEvent.getRawTreeModelDataBase(),
+                                tablet,
+                                true,
+                                sourceEvent.getPipeName(),
+                                sourceEvent.getCreationTime(),
+                                pipeTaskMeta,
+                                sourceEvent,
+                                false);
+                  }
+                  return next;
+                }
+              };
+    }
+
+    return tabletInsertionIterable;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index 4f114bb9cbb..91f0f3baa2f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -138,8 +138,10 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
       tableSchemaSize +=
           tableSchemaEntry.getKey().length()
               + 
PipeMemoryWeightUtil.calculateTableSchemaBytesUsed(tableSchemaEntry.getValue());
-      PipeDataNodeResourceManager.memory()
-          .forceResize(this.allocatedMemoryBlockForTableSchema, 
tableSchemaSize);
+      if (tableSchemaSize > 
allocatedMemoryBlockForTableSchema.getMemoryUsageInBytes()) {
+        PipeDataNodeResourceManager.memory()
+            .forceResize(this.allocatedMemoryBlockForTableSchema, 
tableSchemaSize);
+      }
     }
 
     filteredTableSchemaIterator = tableSchemaList.iterator();
@@ -158,10 +160,11 @@ public class 
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
           case INIT_DATA:
             if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) {
               batchData = chunkReader.nextPageData();
-              PipeDataNodeResourceManager.memory()
-                  .forceResize(
-                      allocatedMemoryBlockForBatchData,
-                      
PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(batchData));
+              final long size = 
PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(batchData);
+              if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < 
size) {
+                PipeDataNodeResourceManager.memory()
+                    .forceResize(allocatedMemoryBlockForBatchData, size);
+              }
               state = State.CHECK_DATA;
               break;
             }
@@ -204,8 +207,10 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
 
                 size +=
                     
PipeMemoryWeightUtil.calculateAlignedChunkMetaBytesUsed(alignedChunkMetadata);
-                PipeDataNodeResourceManager.memory()
-                    .forceResize(allocatedMemoryBlockForChunkMeta, size);
+                if (allocatedMemoryBlockForChunkMeta.getMemoryUsageInBytes() < 
size) {
+                  PipeDataNodeResourceManager.memory()
+                      .forceResize(allocatedMemoryBlockForChunkMeta, size);
+                }
               }
 
               deviceID = pair.getLeft();
@@ -284,8 +289,11 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
           // Calculate row count and memory size of the tablet based on the 
first row
           final Pair<Integer, Integer> rowCountAndMemorySize =
               PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData);
-          PipeDataNodeResourceManager.memory()
-              .forceResize(allocatedMemoryBlockForTablet, 
rowCountAndMemorySize.getLeft());
+          if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes()
+              < rowCountAndMemorySize.getRight()) {
+            PipeDataNodeResourceManager.memory()
+                .forceResize(allocatedMemoryBlockForTablet, 
rowCountAndMemorySize.getRight());
+          }
 
           tablet =
               new Tablet(
@@ -313,7 +321,6 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
     }
 
     if (isFirstRow) {
-      
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 
0);
       tablet = new Tablet(tableName, measurementList, dataTypeList, 
columnTypes, 0);
       tablet.initBitMaps();
     }
@@ -326,7 +333,10 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
     if (Objects.isNull(timeChunk)) {
       timeChunk = reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
       timeChunkSize = 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
-      
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
timeChunkSize);
+      if (allocatedMemoryBlockForChunk.getMemoryUsageInBytes() < 
timeChunkSize) {
+        PipeDataNodeResourceManager.memory()
+            .forceResize(allocatedMemoryBlockForChunk, timeChunkSize);
+      }
     }
     timeChunk.getData().rewind();
     long size = timeChunkSize;
@@ -360,7 +370,9 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
 
         final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
         size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
-        
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
size);
+        if (allocatedMemoryBlockForChunk.getMemoryUsageInBytes() < size) {
+          
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
size);
+        }
 
         valueChunkList.add(chunk);
       }


Reply via email to