This is an automated email from the ASF dual-hosted git repository.

yschengzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new afde755d4f7 [IOTDB-6273] Load: Memory Management Framework (#11756)
afde755d4f7 is described below

commit afde755d4f7d68013282e8e83701b4e20ada9e48
Author: Itami Sho <[email protected]>
AuthorDate: Fri Dec 29 19:45:21 2023 +0800

    [IOTDB-6273] Load: Memory Management Framework (#11756)
    
    * done
    
    * fix
    
    * refactor tsfile analyze schema
    
    * add memory control for data cache
    
    * add release policy for data cache memory block
    
    * fix iterator
    
    * try to load liu zhen dataset
    
    * fix send data and improve writing deletions
    
    * improve deletion data writing on load tsfile manager
    
    * done
    
    * fix
    
    * refactor tsfile analyze schema
    
    * add memory control for data cache
    
    * add release policy for data cache memory block
    
    * fix iterator
    
    * try to load liu zhen dataset
    
    * fix send data and improve writing deletions
    
    * improve deletion data writing on load tsfile manager
    
    * add LoadMemoryMetrics
    
    * remove debug log
    
    * fix config
    
    * queryEngine provides tryAllocate, forceAllocate & release methods for load
    
    * fix UsedMemorySize calculation inaccuracy
    
    * adjust args
    
    ---------
    
    Co-authored-by: yschengzi <[email protected]>
---
 .../it/env/cluster/config/MppDataNodeConfig.java   |   9 +
 .../it/env/remote/config/RemoteDataNodeConfig.java |   6 +
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   3 +
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  |   4 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  57 ++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  11 +-
 .../exception/LoadRuntimeOutOfMemoryException.java |  15 +-
 .../db/queryengine/execution/load/ChunkData.java   |   3 +
 .../queryengine/execution/load/DeletionData.java   |  17 +-
 .../execution/load/LoadTsFileManager.java          |  51 +++-
 .../db/queryengine/execution/load/TsFileData.java  |   3 -
 .../load/LoadTsFileAbstractMemoryBlock.java        |  66 +++++
 .../load/LoadTsFileAnalyzeSchemaMemoryBlock.java   |  98 +++++++
 .../load/LoadTsFileDataCacheMemoryBlock.java       | 147 +++++++++++
 .../queryengine/load/LoadTsFileMemoryManager.java  | 149 +++++++++++
 .../queryengine/metric/LoadTsFileMemMetricSet.java | 101 ++++++++
 .../plan/analyze/LoadTsfileAnalyzer.java           | 284 +++++++++++++++++----
 .../plan/planner/LocalExecutionPlanner.java        |  33 ++-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  34 ++-
 .../db/service/metrics/DataNodeMetricsHelper.java  |   4 +
 .../dataregion/modification/Deletion.java          |   4 +
 .../dataregion/modification/ModificationFile.java  |  19 ++
 .../io/LocalTextModificationAccessor.java          |   9 +-
 .../modification/io/ModificationWriter.java        |   9 +
 .../iotdb/commons/service/metric/enums/Metric.java |   3 +-
 ...leSequenceReaderTimeseriesMetadataIterator.java |  42 ++-
 ...quenceReaderTimeseriesMetadataIteratorTest.java |   8 +-
 27 files changed, 1076 insertions(+), 113 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index c188ad388c4..0f22e0d4286 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -67,4 +67,13 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
     properties.setProperty("dn_connection_timeout_ms", 
String.valueOf(connectionTimeoutInMS));
     return this;
   }
+
+  @Override
+  public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+      long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
+    properties.setProperty(
+        "load_tsfile_analyze_schema_memory_size_in_bytes",
+        String.valueOf(loadTsFileAnalyzeSchemaMemorySizeInBytes));
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 19f0ab36132..fe89997bc41 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -37,4 +37,10 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
   public DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS) {
     return this;
   }
+
+  @Override
+  public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+      long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index db39fa62b42..2887b0a9871 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -28,4 +28,7 @@ public interface DataNodeConfig {
   DataNodeConfig setEnableRestService(boolean enableRestService);
 
   DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS);
+
+  DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+      long loadTsFileAnalyzeSchemaMemorySizeInBytes);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
index b449afc553b..d887bc5a53b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
@@ -65,6 +65,7 @@ public class IOTDBLoadTsFileIT {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IOTDBLoadTsFileIT.class);
   private static final long PARTITION_INTERVAL = 10 * 1000L;
   private static final int connectionTimeoutInMS = (int) 
TimeUnit.SECONDS.toMillis(300);
+  private static final long loadTsFileAnalyzeSchemaMemorySizeInBytes = 10 * 
1024L;
 
   private File tmpDir;
 
@@ -75,7 +76,8 @@ public class IOTDBLoadTsFileIT {
     EnvFactory.getEnv()
         .getConfig()
         .getDataNodeConfig()
-        .setConnectionTimeoutInMS(connectionTimeoutInMS);
+        .setConnectionTimeoutInMS(connectionTimeoutInMS)
+        
.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes);
     EnvFactory.getEnv().initClusterEnvironment();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5cc59bd4837..a12192cf29c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -174,8 +174,6 @@ public class IoTDBConfig {
   /** The proportion of write memory for loading TsFile */
   private double loadTsFileProportion = 0.125;
 
-  private int maxLoadingTimeseriesNumber = 2000;
-
   /**
    * If memory cost of data region increased more than proportion of 
{@linkplain
    * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
@@ -235,6 +233,7 @@ public class IoTDBConfig {
 
   /** The period when outdated wal files are periodically deleted. Unit: 
millisecond */
   private volatile long deleteWalFilesPeriodInMs = 20 * 1000L;
+
   // endregion
 
   /**
@@ -543,6 +542,7 @@ public class IoTDBConfig {
    * tasks containing mods files are selected first.
    */
   private long innerCompactionTaskSelectionModsFileThreshold = 10 * 1024 * 
1024L;
+
   /**
    * When disk availability is lower than the sum of 
(disk_space_warning_threshold +
    * inner_compaction_task_selection_disk_redundancy), inner compaction tasks 
containing mods files
@@ -968,6 +968,7 @@ public class IoTDBConfig {
 
   /** Number of queues per forwarding trigger */
   private int triggerForwardMaxQueueNumber = 8;
+
   /** The length of one of the queues per forwarding trigger */
   private int triggerForwardMaxSizePerQueue = 2000;
 
@@ -1093,6 +1094,15 @@ public class IoTDBConfig {
   private int maxPendingBatchesNum = 5;
   private double maxMemoryRatioForQueue = 0.6;
 
+  /** Load related */
+  private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
+
+  private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
+      0; // 0 means that the decision will be adaptive based on the number of 
sequences
+
+  private long loadMemoryAllocateRetryIntervalMs = 1000;
+  private int loadMemoryAllocateMaxRetries = 5;
+
   /** Pipe related */
   /** initialized as empty, updated based on the latest `systemDir` during 
querying */
   private String[] pipeReceiverFileDirs = new String[0];
@@ -3273,14 +3283,6 @@ public class IoTDBConfig {
     return loadTsFileProportion;
   }
 
-  public int getMaxLoadingTimeseriesNumber() {
-    return maxLoadingTimeseriesNumber;
-  }
-
-  public void setMaxLoadingTimeseriesNumber(int maxLoadingTimeseriesNumber) {
-    this.maxLoadingTimeseriesNumber = maxLoadingTimeseriesNumber;
-  }
-
   public static String getEnvironmentVariables() {
     return "\n\t"
         + IoTDBConstant.IOTDB_HOME
@@ -3735,6 +3737,41 @@ public class IoTDBConfig {
     return modeMapSizeThreshold;
   }
 
+  public int getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber() {
+    return loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber;
+  }
+
+  public void setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(
+      int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber) {
+    this.loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber =
+        loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber;
+  }
+
+  public long getLoadTsFileAnalyzeSchemaMemorySizeInBytes() {
+    return loadTsFileAnalyzeSchemaMemorySizeInBytes;
+  }
+
+  public void setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+      long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
+    this.loadTsFileAnalyzeSchemaMemorySizeInBytes = 
loadTsFileAnalyzeSchemaMemorySizeInBytes;
+  }
+
+  public long getLoadMemoryAllocateRetryIntervalMs() {
+    return loadMemoryAllocateRetryIntervalMs;
+  }
+
+  public void setLoadMemoryAllocateRetryIntervalMs(long 
loadMemoryAllocateRetryIntervalMs) {
+    this.loadMemoryAllocateRetryIntervalMs = loadMemoryAllocateRetryIntervalMs;
+  }
+
+  public int getLoadMemoryAllocateMaxRetries() {
+    return loadMemoryAllocateMaxRetries;
+  }
+
+  public void setLoadMemoryAllocateMaxRetries(int 
loadMemoryAllocateMaxRetries) {
+    this.loadMemoryAllocateMaxRetries = loadMemoryAllocateMaxRetries;
+  }
+
   public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
     this.pipeReceiverFileDirs = pipeReceiverFileDirs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6a8eb822521..494e9d5d1c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -893,11 +893,16 @@ public class IoTDBDescriptor {
       conf.setIntoOperationExecutionThreadCount(2);
     }
 
-    conf.setMaxLoadingTimeseriesNumber(
+    conf.setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(
         Integer.parseInt(
             properties.getProperty(
-                "max_loading_timeseries_number",
-                String.valueOf(conf.getMaxLoadingTimeseriesNumber()))));
+                "load_tsfile_analyze_schema_batch_flush_time_series_number",
+                
String.valueOf(conf.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber()))));
+    conf.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+        Long.parseLong(
+            properties.getProperty(
+                "load_tsfile_analyze_schema_memory_size_in_bytes",
+                
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
 
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", 
conf.getExtPipeDir()).trim());
 
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java
similarity index 67%
copy from 
integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java
index db39fa62b42..050274e9940 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java
@@ -17,15 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.itbase.env;
+package org.apache.iotdb.db.exception;
 
-import java.util.List;
-
-/** This interface is used to handle properties in iotdb-datanode.properties. 
*/
-public interface DataNodeConfig {
-  DataNodeConfig setMetricReporterType(List<String> metricReporterTypes);
-
-  DataNodeConfig setEnableRestService(boolean enableRestService);
-
-  DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS);
+public class LoadRuntimeOutOfMemoryException extends RuntimeException {
+  public LoadRuntimeOutOfMemoryException(String message) {
+    super(message);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
index a30a62e2b16..fde207266e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -45,6 +46,8 @@ public interface ChunkData extends TsFileData {
 
   void writeDecodePage(long[] times, Object[] values, int satisfiedLength) 
throws IOException;
 
+  void writeToFileWriter(TsFileIOWriter writer) throws IOException;
+
   @Override
   default boolean isModification() {
     return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
index 3f486af4adb..18bb50c9c6c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
@@ -23,11 +23,9 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -40,18 +38,13 @@ public class DeletionData implements TsFileData {
 
   @Override
   public long getDataSize() {
-    return Long.BYTES;
+    return deletion.getSerializedSize();
   }
 
-  @Override
-  public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
-    File tsFile = writer.getFile();
-    try (ModificationFile modificationFile =
-        new ModificationFile(tsFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX)) {
-      writer.flush();
-      deletion.setFileOffset(tsFile.length());
-      modificationFile.write(deletion);
-    }
+  public void writeToModificationFile(ModificationFile modificationFile, long 
fileOffset)
+      throws IOException {
+    deletion.setFileOffset(fileOffset);
+    modificationFile.writeWithoutSync(deletion);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 7c92b327a39..8b8eb97946c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePie
 import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -132,7 +133,7 @@ public class LoadTsFileManager {
         writerManager.write(
             new DataPartitionInfo(dataRegion, 
chunkData.getTimePartitionSlot()), chunkData);
       } else {
-        writerManager.writeDeletion(tsFileData);
+        writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
       }
     }
   }
@@ -198,12 +199,14 @@ public class LoadTsFileManager {
     private final File taskDir;
     private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
     private Map<DataPartitionInfo, String> dataPartition2LastDevice;
+    private Map<DataPartitionInfo, ModificationFile> 
dataPartition2ModificationFile;
     private boolean isClosed;
 
     private TsFileWriterManager(File taskDir) {
       this.taskDir = taskDir;
       this.dataPartition2Writer = new HashMap<>();
       this.dataPartition2LastDevice = new HashMap<>();
+      this.dataPartition2ModificationFile = new HashMap<>();
       this.isClosed = false;
 
       clearDir(taskDir);
@@ -245,12 +248,32 @@ public class LoadTsFileManager {
       chunkData.writeToFileWriter(writer);
     }
 
-    private void writeDeletion(TsFileData deletionData) throws IOException {
+    private void writeDeletion(DataRegion dataRegion, DeletionData 
deletionData)
+        throws IOException {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
       for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : 
dataPartition2Writer.entrySet()) {
-        deletionData.writeToFileWriter(entry.getValue());
+        final DataPartitionInfo partitionInfo = entry.getKey();
+        if (partitionInfo.getDataRegion().equals(dataRegion)) {
+          final TsFileIOWriter writer = entry.getValue();
+          if (!dataPartition2ModificationFile.containsKey(partitionInfo)) {
+            File newModificationFile =
+                SystemFileFactory.INSTANCE.getFile(
+                    writer.getFile().getAbsolutePath() + 
ModificationFile.FILE_SUFFIX);
+            if (!newModificationFile.createNewFile()) {
+              LOGGER.error(
+                  "Can not create ModificationFile {} for writing.", 
newModificationFile.getPath());
+              return;
+            }
+
+            dataPartition2ModificationFile.put(
+                partitionInfo, new 
ModificationFile(newModificationFile.getAbsolutePath()));
+          }
+          ModificationFile modificationFile = 
dataPartition2ModificationFile.get(partitionInfo);
+          writer.flush();
+          deletionData.writeToModificationFile(modificationFile, 
writer.getFile().length());
+        }
       }
     }
 
@@ -258,6 +281,10 @@ public class LoadTsFileManager {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
+      for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
+          dataPartition2ModificationFile.entrySet()) {
+        entry.getValue().close();
+      }
       for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : 
dataPartition2Writer.entrySet()) {
         TsFileIOWriter writer = entry.getValue();
         if (writer.isWritingChunkGroup()) {
@@ -302,7 +329,7 @@ public class LoadTsFileManager {
       if (dataPartition2Writer != null) {
         for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : 
dataPartition2Writer.entrySet()) {
           try {
-            TsFileIOWriter writer = entry.getValue();
+            final TsFileIOWriter writer = entry.getValue();
             if (writer.canWrite()) {
               writer.close();
             }
@@ -315,6 +342,21 @@ public class LoadTsFileManager {
           }
         }
       }
+      if (dataPartition2ModificationFile != null) {
+        for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
+            dataPartition2ModificationFile.entrySet()) {
+          try {
+            final ModificationFile modificationFile = entry.getValue();
+            modificationFile.close();
+            final Path modificationFilePath = new 
File(modificationFile.getFilePath()).toPath();
+            if (Files.exists(modificationFilePath)) {
+              Files.delete(modificationFilePath);
+            }
+          } catch (IOException e) {
+            LOGGER.warn("Close ModificationFile {} error.", 
entry.getValue().getFilePath(), e);
+          }
+        }
+      }
       try {
         Files.delete(taskDir.toPath());
       } catch (DirectoryNotEmptyException e) {
@@ -324,6 +366,7 @@ public class LoadTsFileManager {
       }
       dataPartition2Writer = null;
       dataPartition2LastDevice = null;
+      dataPartition2ModificationFile = null;
       isClosed = true;
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
index a53a3f5fc30..c5df4f3f8d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.queryengine.execution.load;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,8 +30,6 @@ import java.io.InputStream;
 public interface TsFileData {
   long getDataSize();
 
-  void writeToFileWriter(TsFileIOWriter writer) throws IOException;
-
   boolean isModification();
 
   void serialize(DataOutputStream stream) throws IOException;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
new file mode 100644
index 00000000000..4f2ad607ccb
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.load;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class LoadTsFileAbstractMemoryBlock implements AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileAbstractMemoryBlock.class);
+  protected static final LoadTsFileMemoryManager MEMORY_MANAGER =
+      LoadTsFileMemoryManager.getInstance();
+
+  private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+  public boolean hasEnoughMemory() {
+    return hasEnoughMemory(0L);
+  }
+
+  public abstract boolean hasEnoughMemory(long memoryTobeAddedInBytes);
+
+  public abstract void addMemoryUsage(long memoryInBytes);
+
+  public abstract void reduceMemoryUsage(long memoryInBytes);
+
+  /**
+   * Release all memory of this block.
+   *
+   * <p>NOTE: This method should be called only by {@link 
LoadTsFileAbstractMemoryBlock#close()}.
+   */
+  protected abstract void releaseAllMemory();
+
+  public boolean isClosed() {
+    return isClosed.get();
+  }
+
+  @Override
+  public void close() {
+    if (isClosed.compareAndSet(false, true)) {
+      try {
+        releaseAllMemory();
+      } catch (Exception e) {
+        LOGGER.error("Release memory block {} failed", this, e);
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
new file mode 100644
index 00000000000..17aa4150bcd
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.load;
+
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LoadTsFileAnalyzeSchemaMemoryBlock extends 
LoadTsFileAbstractMemoryBlock {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(LoadTsFileAnalyzeSchemaMemoryBlock.class);
+
+  private final long totalMemorySizeInBytes;
+  private final AtomicLong memoryUsageInBytes;
+
+  LoadTsFileAnalyzeSchemaMemoryBlock(long totalMemorySizeInBytes) {
+    super();
+
+    this.totalMemorySizeInBytes = totalMemorySizeInBytes;
+    this.memoryUsageInBytes = new AtomicLong(0);
+  }
+
+  @Override
+  public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
+    return memoryUsageInBytes.get() + memoryTobeAddedInBytes <= 
totalMemorySizeInBytes;
+  }
+
+  @Override
+  public void addMemoryUsage(long memoryInBytes) {
+    memoryUsageInBytes.addAndGet(memoryInBytes);
+
+    MetricService.getInstance()
+        .getOrCreateGauge(
+            Metric.LOAD_MEM.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY)
+        .incr(memoryInBytes);
+  }
+
+  @Override
+  public void reduceMemoryUsage(long memoryInBytes) {
+    if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
+      LOGGER.warn("{} has reduce memory usage to negative", this);
+    }
+
+    MetricService.getInstance()
+        .getOrCreateGauge(
+            Metric.LOAD_MEM.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY)
+        .decr(memoryInBytes);
+  }
+
+  @Override
+  protected void releaseAllMemory() {
+    if (memoryUsageInBytes.get() != 0) {
+      LOGGER.warn(
+          "Try to release memory from a memory block {} which has not released 
all memory", this);
+    }
+    MEMORY_MANAGER.releaseToQuery(totalMemorySizeInBytes);
+  }
+
+  @Override
+  public String toString() {
+    return "LoadTsFileAnalyzeSchemaMemoryBlock{"
+        + "totalMemorySizeInBytes="
+        + totalMemorySizeInBytes
+        + ", memoryUsageInBytes="
+        + memoryUsageInBytes.get()
+        + '}';
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
new file mode 100644
index 00000000000..7a22951d672
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.load;
+
+import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LoadTsFileDataCacheMemoryBlock extends 
LoadTsFileAbstractMemoryBlock {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(LoadTsFileDataCacheMemoryBlock.class);
+  private static final long MINIMUM_MEMORY_SIZE_IN_BYTES = 1024 * 1024L; // 1 
MB
+  private static final int MAX_ASK_FOR_MEMORY_COUNT = 256; // must be a power 
of 2
+  private static final long EACH_ASK_MEMORY_SIZE_IN_BYTES =
+      Math.max(
+          MINIMUM_MEMORY_SIZE_IN_BYTES,
+          LoadTsFileMemoryManager.MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 4);
+
+  private final AtomicLong limitedMemorySizeInBytes;
+  private final AtomicLong memoryUsageInBytes;
+  private final AtomicInteger askForMemoryCount;
+  private final AtomicInteger referenceCount;
+
+  LoadTsFileDataCacheMemoryBlock(long initialLimitedMemorySizeInBytes) {
+    super();
+
+    if (initialLimitedMemorySizeInBytes < MINIMUM_MEMORY_SIZE_IN_BYTES) {
+      throw new LoadRuntimeOutOfMemoryException(
+          String.format(
+              "The initial limited memory size %d is less than the minimum 
memory size %d",
+              initialLimitedMemorySizeInBytes, MINIMUM_MEMORY_SIZE_IN_BYTES));
+    }
+
+    this.limitedMemorySizeInBytes = new 
AtomicLong(initialLimitedMemorySizeInBytes);
+    this.memoryUsageInBytes = new AtomicLong(0L);
+    this.askForMemoryCount = new AtomicInteger(1);
+    this.referenceCount = new AtomicInteger(0);
+  }
+
+  @Override
+  public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
+    return memoryUsageInBytes.get() + memoryTobeAddedInBytes <= 
limitedMemorySizeInBytes.get();
+  }
+
+  @Override
+  public void addMemoryUsage(long memoryInBytes) {
+    memoryUsageInBytes.addAndGet(memoryInBytes);
+
+    askForMemoryCount.getAndUpdate(
+        count -> {
+          if ((count & (count - 1)) == 0) {
+            // count is a power of 2
+            long actuallyAllocateMemorySizeInBytes =
+                
MEMORY_MANAGER.tryAllocateFromQuery(EACH_ASK_MEMORY_SIZE_IN_BYTES);
+            
limitedMemorySizeInBytes.addAndGet(actuallyAllocateMemorySizeInBytes);
+            if (actuallyAllocateMemorySizeInBytes < 
EACH_ASK_MEMORY_SIZE_IN_BYTES) {
+              return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
+            } else {
+              return 1;
+            }
+          }
+          return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
+        });
+  }
+
+  @Override
+  public void reduceMemoryUsage(long memoryInBytes) {
+    memoryUsageInBytes.addAndGet(-memoryInBytes);
+  }
+
+  @Override
+  protected void releaseAllMemory() {
+    if (memoryUsageInBytes.get() != 0) {
+      LOGGER.warn(
+          "Try to release memory from a memory block {} which has not released 
all memory", this);
+    }
+    MEMORY_MANAGER.releaseToQuery(limitedMemorySizeInBytes.get());
+  }
+
+  public boolean doShrink(long shrinkMemoryInBytes) {
+    if (shrinkMemoryInBytes < 0) {
+      LOGGER.warn(
+          "Try to shrink a negative memory size {} from memory block {}",
+          shrinkMemoryInBytes,
+          this);
+      return false;
+    } else if (shrinkMemoryInBytes == 0) {
+      return true;
+    }
+
+    if (limitedMemorySizeInBytes.get() - shrinkMemoryInBytes <= 
MINIMUM_MEMORY_SIZE_IN_BYTES) {
+      return false;
+    }
+
+    limitedMemorySizeInBytes.addAndGet(-shrinkMemoryInBytes);
+    return true;
+  }
+
+  void updateReferenceCount(int delta) {
+    referenceCount.addAndGet(delta);
+  }
+
+  int getReferenceCount() {
+    return referenceCount.get();
+  }
+
+  long getMemoryUsageInBytes() {
+    return memoryUsageInBytes.get();
+  }
+
+  long getLimitedMemorySizeInBytes() {
+    return limitedMemorySizeInBytes.get();
+  }
+
+  @Override
+  public String toString() {
+    return "LoadTsFileDataCacheMemoryBlock{"
+        + "limitedMemorySizeInBytes="
+        + limitedMemorySizeInBytes.get()
+        + ", memoryUsageInBytes="
+        + memoryUsageInBytes.get()
+        + ", askForMemoryCount="
+        + askForMemoryCount.get()
+        + '}';
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
new file mode 100644
index 00000000000..aa4d28bbd02
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.load;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LoadTsFileMemoryManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileMemoryManager.class);
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final LocalExecutionPlanner QUERY_ENGINE_MEMORY_MANAGER =
+      LocalExecutionPlanner.getInstance();
+  public static final long MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES =
+      QUERY_ENGINE_MEMORY_MANAGER.getAllocateMemoryForOperators();
+  private static final int MEMORY_ALLOCATE_MAX_RETRIES = 
CONFIG.getLoadMemoryAllocateMaxRetries();
+  private static final long MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS =
+      CONFIG.getLoadMemoryAllocateRetryIntervalMs();
+
+  private final AtomicLong usedMemorySizeInBytes = new AtomicLong(0);
+  private LoadTsFileDataCacheMemoryBlock dataCacheMemoryBlock;
+
+  private synchronized void forceAllocatedFromQuery(long sizeInBytes)
+      throws LoadRuntimeOutOfMemoryException {
+    for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
+      // allocate memory from queryEngine
+      if 
(QUERY_ENGINE_MEMORY_MANAGER.forceAllocateFreeMemoryForOperators(sizeInBytes)) {
+        usedMemorySizeInBytes.addAndGet(sizeInBytes);
+        return;
+      }
+
+      // wait for available memory
+      try {
+        this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("forceAllocate: interrupted while waiting for available 
memory", e);
+      }
+    }
+
+    throw new LoadRuntimeOutOfMemoryException(
+        String.format(
+            "forceAllocate: failed to allocate memory from query engine after 
%d retries, "
+                + "total query memory %s, used memory size %d bytes, "
+                + "requested memory size %d bytes",
+            MEMORY_ALLOCATE_MAX_RETRIES,
+            
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(),
+            usedMemorySizeInBytes.get(),
+            sizeInBytes));
+  }
+
+  public synchronized long tryAllocateFromQuery(long sizeInBytes) {
+    long actuallyAllocateMemoryInBytes =
+        Math.max(0L, 
QUERY_ENGINE_MEMORY_MANAGER.tryAllocateFreeMemoryForOperators(sizeInBytes));
+    usedMemorySizeInBytes.addAndGet(actuallyAllocateMemoryInBytes);
+    return actuallyAllocateMemoryInBytes;
+  }
+
+  public synchronized void releaseToQuery(long sizeInBytes) {
+    usedMemorySizeInBytes.addAndGet(-sizeInBytes);
+    QUERY_ENGINE_MEMORY_MANAGER.releaseToFreeMemoryForOperators(sizeInBytes);
+    this.notifyAll();
+  }
+
+  public synchronized LoadTsFileAnalyzeSchemaMemoryBlock 
allocateAnalyzeSchemaMemoryBlock(
+      long sizeInBytes) throws LoadRuntimeOutOfMemoryException {
+    try {
+      forceAllocatedFromQuery(sizeInBytes);
+    } catch (LoadRuntimeOutOfMemoryException e) {
+      if (dataCacheMemoryBlock != null && 
dataCacheMemoryBlock.doShrink(sizeInBytes)) {
+        return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
+      }
+      throw e;
+    }
+    return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
+  }
+
+  public synchronized LoadTsFileDataCacheMemoryBlock 
allocateDataCacheMemoryBlock()
+      throws LoadRuntimeOutOfMemoryException {
+    if (dataCacheMemoryBlock == null) {
+      long actuallyAllocateMemoryInBytes =
+          tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
+      dataCacheMemoryBlock = new 
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+      LOGGER.info(
+          "Create Data Cache Memory Block {}, allocate memory {}",
+          dataCacheMemoryBlock,
+          actuallyAllocateMemoryInBytes);
+    }
+    dataCacheMemoryBlock.updateReferenceCount(1);
+    return dataCacheMemoryBlock;
+  }
+
+  public synchronized void releaseDataCacheMemoryBlock() {
+    dataCacheMemoryBlock.updateReferenceCount(-1);
+    if (dataCacheMemoryBlock.getReferenceCount() == 0) {
+      LOGGER.info("Release Data Cache Memory Block {}", dataCacheMemoryBlock);
+      dataCacheMemoryBlock.close();
+      dataCacheMemoryBlock = null;
+    }
+  }
+
+  // used for Metrics
+  public long getUsedMemorySizeInBytes() {
+    return usedMemorySizeInBytes.get();
+  }
+
+  public long getDataCacheUsedMemorySizeInBytes() {
+    return dataCacheMemoryBlock == null ? 0 : 
dataCacheMemoryBlock.getMemoryUsageInBytes();
+  }
+
+  public long getDataCacheLimitedMemorySizeInBytes() {
+    return dataCacheMemoryBlock == null ? 0 : 
dataCacheMemoryBlock.getLimitedMemorySizeInBytes();
+  }
+
+  ///////////////////////////// SINGLETON /////////////////////////////
+  private LoadTsFileMemoryManager() {}
+
+  public static LoadTsFileMemoryManager getInstance() {
+    return LoadTsFileMemoryManagerHolder.INSTANCE;
+  }
+
+  public static class LoadTsFileMemoryManagerHolder {
+    private static final LoadTsFileMemoryManager INSTANCE = new 
LoadTsFileMemoryManager();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java
new file mode 100644
index 00000000000..7e822421302
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class LoadTsFileMemMetricSet implements IMetricSet {
+
+  private static final String LOAD_TSFILE_USED_MEMORY = "LoadTsFileUsedMemory";
+  public static final String LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY = 
"LoadTsFileAnalyzeSchemaMemory";
+
+  private static final String LOAD_TSFILE_DATA_CACHE_MEMORY = 
"LoadTsFileDataCacheMemory";
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.LOAD_MEM.toString(),
+        MetricLevel.IMPORTANT,
+        LoadTsFileMemoryManager.getInstance(),
+        LoadTsFileMemoryManager::getUsedMemorySizeInBytes,
+        Tag.NAME.toString(),
+        LOAD_TSFILE_USED_MEMORY);
+
+    metricService.createAutoGauge(
+        Metric.LOAD_MEM.toString(),
+        MetricLevel.IMPORTANT,
+        LoadTsFileMemoryManager.getInstance(),
+        LoadTsFileMemoryManager::getDataCacheUsedMemorySizeInBytes,
+        Tag.NAME.toString(),
+        LOAD_TSFILE_DATA_CACHE_MEMORY);
+
+    metricService
+        .getOrCreateGauge(
+            Metric.LOAD_MEM.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY)
+        .set(0L);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.LOAD_MEM.toString(),
+        Tag.NAME.toString(),
+        LOAD_TSFILE_USED_MEMORY);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.LOAD_MEM.toString(),
+        Tag.NAME.toString(),
+        LOAD_TSFILE_DATA_CACHE_MEMORY);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.LOAD_MEM.toString(),
+        Tag.NAME.toString(),
+        LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class LoadTsFileMemMetricSetHolder {
+
+    private static final LoadTsFileMemMetricSet INSTANCE = new 
LoadTsFileMemMetricSet();
+
+    private LoadTsFileMemMetricSetHolder() {
+      // empty constructor
+    }
+  }
+
+  public static LoadTsFileMemMetricSet getInstance() {
+    return LoadTsFileMemMetricSet.LoadTsFileMemMetricSetHolder.INSTANCE;
+  }
+
+  private LoadTsFileMemMetricSet() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 57b15dbdc02..84860f5d847 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -33,9 +33,11 @@ import 
org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
 import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.LoadReadOnlyException;
+import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.exception.VerifyMetadataException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
@@ -45,6 +47,8 @@ import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileAnalyzeSchemaMemoryBlock;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
@@ -81,6 +85,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -92,6 +97,19 @@ public class LoadTsfileAnalyzer {
 
   private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
       ConfigNodeClientManager.getInstance();
+  private static final int BATCH_FLUSH_TIME_SERIES_NUMBER;
+  private static final long ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES;
+  private static final long FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES;
+
+  static {
+    final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+    BATCH_FLUSH_TIME_SERIES_NUMBER = 
CONFIG.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber();
+    ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES =
+        CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes() <= 0
+            ? ((long) BATCH_FLUSH_TIME_SERIES_NUMBER) << 10
+            : CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes();
+    FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES = 
ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES >> 1;
+  }
 
   private final LoadTsFileStatement loadTsFileStatement;
   private final MPPQueryContext context;
@@ -99,8 +117,7 @@ public class LoadTsfileAnalyzer {
   private final IPartitionFetcher partitionFetcher;
   private final ISchemaFetcher schemaFetcher;
 
-  private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier =
-      new SchemaAutoCreatorAndVerifier();
+  private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
 
   LoadTsfileAnalyzer(
       LoadTsFileStatement loadTsFileStatement,
@@ -112,6 +129,16 @@ public class LoadTsfileAnalyzer {
 
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
+
+    try {
+      this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier();
+    } catch (LoadRuntimeOutOfMemoryException e) {
+      LOGGER.warn("Can not allocate memory for analyze TsFile schema.", e);
+      throw new SemanticException(
+          String.format(
+              "Can not allocate memory for analyze TsFile schema when 
executing statement %s.",
+              loadTsFileStatement));
+    }
   }
 
   public Analysis analyzeFileByFile() {
@@ -150,15 +177,16 @@ public class LoadTsfileAnalyzer {
               i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / 
tsfileNum));
         }
       } catch (IllegalArgumentException e) {
-        schemaAutoCreatorAndVerifier.clear();
+        schemaAutoCreatorAndVerifier.close();
         LOGGER.warn(
             "Parse file {} to resource error, this TsFile maybe empty.", 
tsFile.getPath(), e);
         throw new SemanticException(
             String.format("TsFile %s is empty or incomplete.", 
tsFile.getPath()));
       } catch (AuthException e) {
+        schemaAutoCreatorAndVerifier.close();
         return createFailAnalysisForAuthException(e);
       } catch (Exception e) {
-        schemaAutoCreatorAndVerifier.clear();
+        schemaAutoCreatorAndVerifier.close();
         LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e);
         throw new SemanticException(
             String.format(
@@ -168,9 +196,10 @@ public class LoadTsfileAnalyzer {
 
     try {
       schemaAutoCreatorAndVerifier.flush();
-      schemaAutoCreatorAndVerifier.clear();
     } catch (AuthException e) {
       return createFailAnalysisForAuthException(e);
+    } finally {
+      schemaAutoCreatorAndVerifier.close();
     }
 
     LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
@@ -185,7 +214,7 @@ public class LoadTsfileAnalyzer {
     try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
       // can be reused when constructing tsfile resource
       final TsFileSequenceReaderTimeseriesMetadataIterator 
timeseriesMetadataIterator =
-          new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true);
+          new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true, 1);
 
       long writePointCount = 0;
 
@@ -239,7 +268,6 @@ public class LoadTsfileAnalyzer {
   }
 
   private Analysis createFailAnalysisForAuthException(AuthException e) {
-    schemaAutoCreatorAndVerifier.clear();
     Analysis analysis = new Analysis();
     analysis.setFinishQueryAfterAnalyze(true);
     analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage()));
@@ -247,14 +275,11 @@ public class LoadTsfileAnalyzer {
   }
 
   private final class SchemaAutoCreatorAndVerifier {
+    private final LoadTsFileAnalyzeSchemaCache schemaCache;
 
-    private final Map<String, Boolean> tsfileDevice2IsAligned = new 
HashMap<>();
-    private final Map<String, Set<MeasurementSchema>> 
currentBatchDevice2TimeseriesSchemas =
-        new HashMap<>();
-
-    private final Set<PartialPath> alreadySetDatabases = new HashSet<>();
-
-    private SchemaAutoCreatorAndVerifier() {}
+    private SchemaAutoCreatorAndVerifier() throws 
LoadRuntimeOutOfMemoryException {
+      this.schemaCache = new LoadTsFileAnalyzeSchemaCache();
+    }
 
     public void autoCreateAndVerify(
         TsFileSequenceReader reader,
@@ -267,7 +292,9 @@ public class LoadTsfileAnalyzer {
         for (final TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
           final TSDataType dataType = timeseriesMetadata.getTsDataType();
           if (TSDataType.VECTOR.equals(dataType)) {
-            tsfileDevice2IsAligned.put(device, true);
+            schemaCache
+                .clearDeviceIsAlignedCacheIfNecessary(); // must execute 
before add aligned cache
+            schemaCache.addIsAlignedCache(device, true, false);
 
             // not a timeseries, skip
           } else {
@@ -301,21 +328,25 @@ public class LoadTsfileAnalyzer {
             }
             final Pair<CompressionType, TSEncoding> compressionEncodingPair =
                 
reader.readTimeseriesCompressionTypeAndEncoding(timeseriesMetadata);
-            currentBatchDevice2TimeseriesSchemas
-                .computeIfAbsent(device, o -> new HashSet<>())
-                .add(
-                    new MeasurementSchema(
-                        timeseriesMetadata.getMeasurementId(),
-                        dataType,
-                        compressionEncodingPair.getRight(),
-                        compressionEncodingPair.getLeft()));
-
-            tsfileDevice2IsAligned.putIfAbsent(device, false);
+            schemaCache.addTimeSeries(
+                device,
+                new MeasurementSchema(
+                    timeseriesMetadata.getMeasurementId(),
+                    dataType,
+                    compressionEncodingPair.getRight(),
+                    compressionEncodingPair.getLeft()));
+
+            schemaCache.addIsAlignedCache(device, false, true);
+            if (!schemaCache.getDeviceIsAligned(device)) {
+              schemaCache.clearDeviceIsAlignedCacheIfNecessary();
+            }
+          }
+
+          if (schemaCache.shouldFlushTimeSeries()) {
+            flush();
           }
         }
       }
-
-      flush();
     }
 
     /**
@@ -326,20 +357,17 @@ public class LoadTsfileAnalyzer {
         throws SemanticException, AuthException {
       // avoid OOM when loading a tsfile with too many timeseries
       // or loading too many tsfiles at the same time
-      if (tsfileDevice2IsAligned.size() > 10000) {
-        flush();
-        tsfileDevice2IsAligned.clear();
-      }
+      schemaCache.clearDeviceIsAlignedCacheIfNecessary();
     }
 
     public void flush() throws AuthException {
       doAutoCreateAndVerify();
 
-      currentBatchDevice2TimeseriesSchemas.clear();
+      schemaCache.clearTimeSeries();
     }
 
     private void doAutoCreateAndVerify() throws SemanticException, 
AuthException {
-      if (currentBatchDevice2TimeseriesSchemas.isEmpty()) {
+      if (schemaCache.getDevice2TimeSeries().isEmpty()) {
         return;
       }
 
@@ -372,7 +400,7 @@ public class LoadTsfileAnalyzer {
 
     private void makeSureNoDuplicatedMeasurementsInDevices() throws 
VerifyMetadataException {
       for (final Map.Entry<String, Set<MeasurementSchema>> entry :
-          currentBatchDevice2TimeseriesSchemas.entrySet()) {
+          schemaCache.getDevice2TimeSeries().entrySet()) {
         final String device = entry.getKey();
         final Map<String, MeasurementSchema> measurement2Schema = new 
HashMap<>();
         for (final MeasurementSchema timeseriesSchema : entry.getValue()) {
@@ -391,7 +419,7 @@ public class LoadTsfileAnalyzer {
       final int databasePrefixNodesLength = 
loadTsFileStatement.getDatabaseLevel() + 1;
       final Set<PartialPath> databasesNeededToBeSet = new HashSet<>();
 
-      for (final String device : 
currentBatchDevice2TimeseriesSchemas.keySet()) {
+      for (final String device : schemaCache.getDevice2TimeSeries().keySet()) {
         final PartialPath devicePath = new PartialPath(device);
 
         final String[] devicePrefixNodes = devicePath.getNodes();
@@ -409,7 +437,7 @@ public class LoadTsfileAnalyzer {
       }
 
       // 1. filter out the databases that already exist
-      if (alreadySetDatabases.isEmpty()) {
+      if (schemaCache.getAlreadySetDatabases().isEmpty()) {
         try (final ConfigNodeClient configNodeClient =
             
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
           final TGetDatabaseReq req =
@@ -422,13 +450,13 @@ public class LoadTsfileAnalyzer {
           final TShowDatabaseResp resp = configNodeClient.showDatabase(req);
 
           for (final String databaseName : resp.getDatabaseInfoMap().keySet()) 
{
-            alreadySetDatabases.add(new PartialPath(databaseName));
+            schemaCache.addAlreadySetDatabase(new PartialPath(databaseName));
           }
         } catch (IOException | TException | ClientManagerException e) {
           throw new LoadFileException(e);
         }
       }
-      databasesNeededToBeSet.removeAll(alreadySetDatabases);
+      databasesNeededToBeSet.removeAll(schemaCache.getAlreadySetDatabases());
 
       // 2. create the databases that do not exist
       for (final PartialPath databasePath : databasesNeededToBeSet) {
@@ -439,7 +467,7 @@ public class LoadTsfileAnalyzer {
         statement.setEnablePrintExceptionLog(false);
         executeSetDatabaseStatement(statement);
 
-        alreadySetDatabases.add(databasePath);
+        schemaCache.addAlreadySetDatabase(databasePath);
       }
     }
 
@@ -484,7 +512,7 @@ public class LoadTsfileAnalyzer {
       final List<Boolean> isAlignedList = new ArrayList<>();
 
       for (final Map.Entry<String, Set<MeasurementSchema>> entry :
-          currentBatchDevice2TimeseriesSchemas.entrySet()) {
+          schemaCache.getDevice2TimeSeries().entrySet()) {
         final int measurementSize = entry.getValue().size();
         final String[] measurements = new String[measurementSize];
         final TSDataType[] tsDataTypes = new TSDataType[measurementSize];
@@ -504,7 +532,7 @@ public class LoadTsfileAnalyzer {
         dataTypeList.add(tsDataTypes);
         encodingsList.add(encodings);
         compressionTypesList.add(compressionTypes);
-        isAlignedList.add(tsfileDevice2IsAligned.get(entry.getKey()));
+        isAlignedList.add(schemaCache.getDeviceIsAligned(entry.getKey()));
       }
 
       return SchemaValidator.validate(
@@ -521,7 +549,7 @@ public class LoadTsfileAnalyzer {
     private void verifySchema(ISchemaTree schemaTree)
         throws VerifyMetadataException, IllegalPathException {
       for (final Map.Entry<String, Set<MeasurementSchema>> entry :
-          currentBatchDevice2TimeseriesSchemas.entrySet()) {
+          schemaCache.getDevice2TimeSeries().entrySet()) {
         final String device = entry.getKey();
         final List<MeasurementSchema> tsfileTimeseriesSchemas = new 
ArrayList<>(entry.getValue());
         final DeviceSchemaInfo iotdbDeviceSchemaInfo =
@@ -540,7 +568,7 @@ public class LoadTsfileAnalyzer {
         }
 
         // check device schema: is aligned or not
-        final boolean isAlignedInTsFile = tsfileDevice2IsAligned.get(device);
+        final boolean isAlignedInTsFile = 
schemaCache.getDeviceIsAligned(device);
         final boolean isAlignedInIoTDB = iotdbDeviceSchemaInfo.isAligned();
         if (isAlignedInTsFile != isAlignedInIoTDB) {
           throw new VerifyMetadataException(
@@ -608,10 +636,174 @@ public class LoadTsfileAnalyzer {
       }
     }
 
-    public void clear() {
-      tsfileDevice2IsAligned.clear();
-      currentBatchDevice2TimeseriesSchemas.clear();
+    public void close() {
+      schemaCache.close();
+    }
+  }
+
+  private static class LoadTsFileAnalyzeSchemaCache {
+
+    private final LoadTsFileAnalyzeSchemaMemoryBlock block;
+
+    private Map<String, Set<MeasurementSchema>> 
currentBatchDevice2TimeSeriesSchemas;
+    private Map<String, Boolean> tsFileDevice2IsAligned;
+    private Set<PartialPath> alreadySetDatabases;
+
+    private long batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0;
+    private long tsFileDevice2IsAlignedMemoryUsageSizeInBytes = 0;
+    private long alreadySetDatabasesMemoryUsageSizeInBytes = 0;
+
+    private int currentBatchTimeSeriesCount = 0;
+
+    public LoadTsFileAnalyzeSchemaCache() throws 
LoadRuntimeOutOfMemoryException {
+      this.block =
+          LoadTsFileMemoryManager.getInstance()
+              
.allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
+      this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
+      this.tsFileDevice2IsAligned = new HashMap<>();
+      this.alreadySetDatabases = new HashSet<>();
+    }
+
+    public Map<String, Set<MeasurementSchema>> getDevice2TimeSeries() {
+      return currentBatchDevice2TimeSeriesSchemas;
+    }
+
+    public boolean getDeviceIsAligned(String device) {
+      if (!tsFileDevice2IsAligned.containsKey(device)) {
+        LOGGER.warn(
+            "Device {} is not in the tsFileDevice2IsAligned cache {}.",
+            device,
+            tsFileDevice2IsAligned);
+      }
+      return tsFileDevice2IsAligned.get(device);
+    }
+
+    public Set<PartialPath> getAlreadySetDatabases() {
+      return alreadySetDatabases;
+    }
+
+    public void addTimeSeries(String device, MeasurementSchema 
measurementSchema) {
+      long memoryUsageSizeInBytes = 0;
+      if (!currentBatchDevice2TimeSeriesSchemas.containsKey(device)) {
+        memoryUsageSizeInBytes += estimateStringSize(device);
+      }
+      if (currentBatchDevice2TimeSeriesSchemas
+          .computeIfAbsent(device, k -> new HashSet<>())
+          .add(measurementSchema)) {
+        memoryUsageSizeInBytes += measurementSchema.serializedSize();
+        currentBatchTimeSeriesCount++;
+      }
+
+      if (memoryUsageSizeInBytes > 0) {
+        batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes += 
memoryUsageSizeInBytes;
+        block.addMemoryUsage(memoryUsageSizeInBytes);
+      }
+    }
+
+    public void addIsAlignedCache(String device, boolean isAligned, boolean 
addIfAbsent) {
+      long memoryUsageSizeInBytes = 0;
+      if (!tsFileDevice2IsAligned.containsKey(device)) {
+        memoryUsageSizeInBytes += estimateStringSize(device);
+      }
+      if (addIfAbsent
+          ? (tsFileDevice2IsAligned.putIfAbsent(device, isAligned) == null)
+          : (tsFileDevice2IsAligned.put(device, isAligned) == null)) {
+        memoryUsageSizeInBytes += Byte.BYTES;
+      }
+
+      if (memoryUsageSizeInBytes > 0) {
+        tsFileDevice2IsAlignedMemoryUsageSizeInBytes += memoryUsageSizeInBytes;
+        block.addMemoryUsage(memoryUsageSizeInBytes);
+      }
+    }
+
+    public void addAlreadySetDatabase(PartialPath database) {
+      long memoryUsageSizeInBytes = 0;
+      if (alreadySetDatabases.add(database)) {
+        memoryUsageSizeInBytes += PartialPath.estimateSize(database);
+      }
+
+      if (memoryUsageSizeInBytes > 0) {
+        alreadySetDatabasesMemoryUsageSizeInBytes += memoryUsageSizeInBytes;
+        block.addMemoryUsage(memoryUsageSizeInBytes);
+      }
+    }
+
+    public boolean shouldFlushTimeSeries() {
+      return !block.hasEnoughMemory()
+          || currentBatchTimeSeriesCount >= BATCH_FLUSH_TIME_SERIES_NUMBER;
+    }
+
+    public boolean shouldFlushAlignedCache() {
+      return tsFileDevice2IsAlignedMemoryUsageSizeInBytes
+          >= FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES;
+    }
+
+    public void clearTimeSeries() {
+      currentBatchDevice2TimeSeriesSchemas.clear();
+      
block.reduceMemoryUsage(batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes);
+      batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0;
+      currentBatchTimeSeriesCount = 0;
+    }
+
+    public void clearAlignedCache() {
+      tsFileDevice2IsAligned.clear();
+      block.reduceMemoryUsage(tsFileDevice2IsAlignedMemoryUsageSizeInBytes);
+      tsFileDevice2IsAlignedMemoryUsageSizeInBytes = 0;
+    }
+
+    public void clearDeviceIsAlignedCacheIfNecessary() {
+      if (!shouldFlushAlignedCache()) {
+        return;
+      }
+
+      long releaseMemoryInBytes = 0;
+      final Set<String> timeSeriesCacheKeySet =
+          new HashSet<>(currentBatchDevice2TimeSeriesSchemas.keySet());
+      Iterator<Map.Entry<String, Boolean>> iterator = 
tsFileDevice2IsAligned.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, Boolean> entry = iterator.next();
+        if (!timeSeriesCacheKeySet.contains(entry.getKey())) {
+          releaseMemoryInBytes += estimateStringSize(entry.getKey()) + 
Byte.BYTES;
+          iterator.remove();
+        }
+      }
+      if (releaseMemoryInBytes > 0) {
+        tsFileDevice2IsAlignedMemoryUsageSizeInBytes -= releaseMemoryInBytes;
+        block.reduceMemoryUsage(releaseMemoryInBytes);
+      }
+    }
+
+    private void clearDatabasesCache() {
       alreadySetDatabases.clear();
+      block.reduceMemoryUsage(alreadySetDatabasesMemoryUsageSizeInBytes);
+      alreadySetDatabasesMemoryUsageSizeInBytes = 0;
+    }
+
+    public void close() {
+      clearTimeSeries();
+      clearAlignedCache();
+      clearDatabasesCache();
+
+      block.close();
+
+      currentBatchDevice2TimeSeriesSchemas = null;
+      tsFileDevice2IsAligned = null;
+      alreadySetDatabases = null;
+    }
+
+    /**
+     * String basic total, 32B
+     *
+     * <ul>
+     *   <li>Object header, 8B
+     *   <li>char[] reference + header + length, 8 + 4 + 8= 20B
+     *   <li>hash code, 4B
+     * </ul>
+     */
+    private static int estimateStringSize(String string) {
+      // each char takes 2B in Java
+      return string == null ? 0 : 32 + 2 * string.length();
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index d4d51195607..0b1fc6cecd7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -45,10 +45,11 @@ import java.util.List;
 public class LocalExecutionPlanner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalExecutionPlanner.class);
+  private static final long ALLOCATE_MEMORY_FOR_OPERATORS =
+      
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
 
   /** allocated memory for operator execution */
-  private long freeMemoryForOperators =
-      
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
+  private long freeMemoryForOperators = ALLOCATE_MEMORY_FOR_OPERATORS;
 
   public long getFreeMemoryForOperators() {
     return freeMemoryForOperators;
@@ -163,6 +164,34 @@ public class LocalExecutionPlanner {
     return sourcePaths;
   }
 
+  public synchronized boolean forceAllocateFreeMemoryForOperators(long 
memoryInBytes) {
+    if (freeMemoryForOperators < memoryInBytes) {
+      return false;
+    } else {
+      freeMemoryForOperators -= memoryInBytes;
+      return true;
+    }
+  }
+
+  public synchronized long tryAllocateFreeMemoryForOperators(long 
memoryInBytes) {
+    if (freeMemoryForOperators < memoryInBytes) {
+      long result = freeMemoryForOperators;
+      freeMemoryForOperators = 0;
+      return result;
+    } else {
+      freeMemoryForOperators -= memoryInBytes;
+      return memoryInBytes;
+    }
+  }
+
+  public synchronized void releaseToFreeMemoryForOperators(long memoryInBytes) 
{
+    freeMemoryForOperators += memoryInBytes;
+  }
+
+  public long getAllocateMemoryForOperators() {
+    return ALLOCATE_MEMORY_FOR_OPERATORS;
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 92a3ecd1047..51c4b44df38 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -46,6 +46,8 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
 import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
 import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
 import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -93,15 +95,17 @@ import java.util.stream.IntStream;
 public class LoadTsFileScheduler implements IScheduler {
   private static final Logger logger = 
LoggerFactory.getLogger(LoadTsFileScheduler.class);
   public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 900L; // 15min
-  private static final long MAX_MEMORY_SIZE;
+  private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE;
   private static final int TRANSMIT_LIMIT;
 
   static {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    MAX_MEMORY_SIZE =
+    SINGLE_SCHEDULER_MAX_MEMORY_SIZE =
         Math.min(
             config.getThriftMaxFrameSize() >> 2,
-            (long) (config.getAllocateMemoryForStorageEngine() * 
config.getLoadTsFileProportion()));
+            (long)
+                (config.getAllocateMemoryForStorageEngine()
+                    * config.getLoadTsFileProportion())); // TODO: change it 
to query engine
     TRANSMIT_LIMIT =
         
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
   }
@@ -114,6 +118,7 @@ public class LoadTsFileScheduler implements IScheduler {
   private final PlanFragmentId fragmentId;
   private final Set<TRegionReplicaSet> allReplicaSets;
   private final boolean isGeneratedByPipe;
+  private final LoadTsFileDataCacheMemoryBlock block;
 
   public LoadTsFileScheduler(
       DistributedQueryPlan distributedQueryPlan,
@@ -130,6 +135,7 @@ public class LoadTsFileScheduler implements IScheduler {
     this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
     this.allReplicaSets = new HashSet<>();
     this.isGeneratedByPipe = isGeneratedByPipe;
+    this.block = 
LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock();
 
     for (FragmentInstance fragmentInstance : 
distributedQueryPlan.getInstances()) {
       tsFileNodeList.add((LoadSingleTsFileNode) 
fragmentInstance.getFragment().getPlanNodeTree());
@@ -200,10 +206,11 @@ public class LoadTsFileScheduler implements IScheduler {
     if (isLoadSuccess) {
       stateMachine.transitionToFinished();
     }
+    LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
   }
 
   private boolean firstPhase(LoadSingleTsFileNode node) {
-    final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, 
node);
+    final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, 
node, block);
     try {
       new TsFileSplitter(
               node.getTsFileResource().getTsFile(), 
tsFileDataManager::addOrSendTsFileData)
@@ -407,13 +414,18 @@ public class LoadTsFileScheduler implements IScheduler {
     private long dataSize;
     private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
     private final List<ChunkData> nonDirectionalChunkData;
+    private final LoadTsFileDataCacheMemoryBlock block;
 
-    public TsFileDataManager(LoadTsFileScheduler scheduler, 
LoadSingleTsFileNode singleTsFileNode) {
+    public TsFileDataManager(
+        LoadTsFileScheduler scheduler,
+        LoadSingleTsFileNode singleTsFileNode,
+        LoadTsFileDataCacheMemoryBlock block) {
       this.scheduler = scheduler;
       this.singleTsFileNode = singleTsFileNode;
       this.dataSize = 0;
       this.replicaSet2Piece = new HashMap<>();
       this.nonDirectionalChunkData = new ArrayList<>();
+      this.block = block;
     }
 
     private boolean addOrSendTsFileData(TsFileData tsFileData) {
@@ -422,11 +434,16 @@ public class LoadTsFileScheduler implements IScheduler {
           : addOrSendChunkData((ChunkData) tsFileData);
     }
 
+    private boolean isMemoryEnough() {
+      return dataSize <= SINGLE_SCHEDULER_MAX_MEMORY_SIZE && 
block.hasEnoughMemory();
+    }
+
     private boolean addOrSendChunkData(ChunkData chunkData) {
       nonDirectionalChunkData.add(chunkData);
       dataSize += chunkData.getDataSize();
+      block.addMemoryUsage(chunkData.getDataSize());
 
-      if (dataSize > MAX_MEMORY_SIZE) {
+      if (!isMemoryEnough()) {
         routeChunkData();
 
         // start to dispatch from the biggest TsFilePieceNode
@@ -446,6 +463,7 @@ public class LoadTsFileScheduler implements IScheduler {
           }
 
           dataSize -= pieceNode.getDataSize();
+          block.reduceMemoryUsage(pieceNode.getDataSize());
           replicaSet2Piece.put(
               sortedReplicaSet,
               new LoadTsFilePieceNode(
@@ -453,7 +471,7 @@ public class LoadTsFileScheduler implements IScheduler {
                   singleTsFileNode
                       .getTsFileResource()
                       .getTsFile())); // can not just remove, because of 
deletion
-          if (dataSize <= MAX_MEMORY_SIZE) {
+          if (isMemoryEnough()) {
             break;
           }
         }
@@ -492,6 +510,7 @@ public class LoadTsFileScheduler implements IScheduler {
 
       for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
         dataSize += deletionData.getDataSize();
+        block.addMemoryUsage(deletionData.getDataSize());
         entry.getValue().addTsFileData(deletionData);
       }
       return true;
@@ -501,6 +520,7 @@ public class LoadTsFileScheduler implements IScheduler {
       routeChunkData();
 
       for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
+        block.reduceMemoryUsage(entry.getValue().getDataSize());
         if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) 
{
           logger.warn(
               "Dispatch piece node {} of TsFile {} error.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index 6e6e2afa2e0..ef06d98cebb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.pipe.metric.PipeDataNodeMetrics;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
+import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
@@ -85,6 +86,9 @@ public class DataNodeMetricsHelper {
 
     // bind pipe related metrics
     
MetricService.getInstance().addMetricSet(PipeDataNodeMetrics.getInstance());
+
+    // bind load tsfile memory related metrics
+    
MetricService.getInstance().addMetricSet(LoadTsFileMemMetricSet.getInstance());
   }
 
   private static void initSystemMetrics() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
index 5d0c53aa611..67c8969dd84 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
@@ -118,6 +118,10 @@ public class Deletion extends Modification implements 
Cloneable {
         new PartialPath(ReadWriteIOUtils.readString(stream)), 0, startTime, 
endTime);
   }
 
+  public long getSerializedSize() {
+    return Long.BYTES * 2 + Integer.BYTES + (long) getPathString().length() * 
Character.BYTES;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index 6491f4c4eaa..f4f8ab9f153 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -66,6 +66,7 @@ public class ModificationFile implements AutoCloseable {
   private static final long COMPACT_THRESHOLD = 1024 * 1024;
 
   private boolean hasCompacted = false;
+
   /**
    * Construct a ModificationFile using a file as its storage.
    *
@@ -103,6 +104,24 @@ public class ModificationFile implements AutoCloseable {
     }
   }
 
+  /**
+   * Write a modification in this file. The modification will first be written 
to the persistent
+   * store then the memory cache. Notice that this method does not synchronize 
to physical disk
+   * after
+   *
+   * @param mod the modification to be written.
+   * @throws IOException if IOException is thrown when writing the 
modification to the store.
+   */
+  public void writeWithoutSync(Modification mod) throws IOException {
+    synchronized (this) {
+      if (needVerify && new File(filePath).exists()) {
+        writer.mayTruncateLastLine();
+        needVerify = false;
+      }
+      writer.writeWithOutSync(mod);
+    }
+  }
+
   @GuardedBy("TsFileResource-WriteLock")
   public void truncate(long size) {
     writer.truncate(size);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
index 75c32adf5e8..7f666eab511 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
@@ -163,12 +163,17 @@ public class LocalTextModificationAccessor
 
   @Override
   public void write(Modification mod) throws IOException {
+    writeWithOutSync(mod);
+    force();
+  }
+
+  @Override
+  public void writeWithOutSync(Modification mod) throws IOException {
     if (fos == null) {
       fos = new FileOutputStream(filePath, true);
     }
     fos.write(encodeModification(mod).getBytes());
     fos.write(System.lineSeparator().getBytes());
-    force();
   }
 
   @TestOnly
@@ -210,7 +215,7 @@ public class LocalTextModificationAccessor
   public void mayTruncateLastLine() {
     try (RandomAccessFile file = new RandomAccessFile(filePath, "r")) {
       long filePointer = file.length() - 1;
-      if (filePointer == 0) {
+      if (filePointer <= 0) {
         return;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
index 314405c633c..b4f538ac62d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
@@ -37,6 +37,15 @@ public interface ModificationWriter {
    */
   void write(Modification mod) throws IOException;
 
+  /**
+   * Write a new modification to the persistent medium. Notice that after 
calling write(), a
+   * fileWriter is opened. Notice that this method does not synchronize to 
physical disk after
+   * writing.
+   *
+   * @param mod the modification to be written.
+   */
+  void writeWithOutSync(Modification mod) throws IOException;
+
   void truncate(long size);
 
   void mayTruncateLastLine();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index d2b0eb89e1c..61aff528e38 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -147,7 +147,8 @@ public enum Metric {
   PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"),
   PIPE_PROCEDURE("pipe_procedure"),
   PIPE_TASK_STATUS("pipe_task_status"),
-  ;
+  // load related
+  LOAD_MEM("load_mem");
 
   final String value;
 
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java
index 9f077c6806e..dacdbc54fc5 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java
@@ -39,17 +39,22 @@ import java.util.NoSuchElementException;
 public class TsFileSequenceReaderTimeseriesMetadataIterator
     implements Iterator<Map<String, List<TimeseriesMetadata>>> {
 
-  private static final int MAX_TIMESERIES_METADATA_COUNT = 2000;
+  private static final int DEFAULT_TIMESERIES_BATCH_READ_NUMBER = 4000;
   private final TsFileSequenceReader reader;
   private final boolean needChunkMetadata;
+  private final int timeseriesBatchReadNumber;
   private ByteBuffer currentBuffer = null;
+  private long currentEndOffset = Long.MIN_VALUE;
   private final Deque<MetadataIndexEntryInfo> metadataIndexEntryStack = new 
ArrayDeque<>();
   private String currentDeviceId;
   private int currentTimeseriesMetadataCount = 0;
 
   public TsFileSequenceReaderTimeseriesMetadataIterator(
-      TsFileSequenceReader reader, boolean needChunkMetadata) throws 
IOException {
+      TsFileSequenceReader reader, boolean needChunkMetadata, int 
timeseriesBatchReadNumber)
+      throws IOException {
     this.reader = reader;
+    this.needChunkMetadata = needChunkMetadata;
+    this.timeseriesBatchReadNumber = timeseriesBatchReadNumber;
 
     if (this.reader.tsFileMetaData == null) {
       this.reader.readFileMetadata();
@@ -58,7 +63,6 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
     final MetadataIndexNode metadataIndexNode = 
reader.tsFileMetaData.getMetadataIndex();
     long curEntryEndOffset = metadataIndexNode.getEndOffset();
     List<MetadataIndexEntry> metadataIndexEntryList = 
metadataIndexNode.getChildren();
-    this.needChunkMetadata = needChunkMetadata;
 
     for (int i = metadataIndexEntryList.size() - 1; i >= 0; i--) {
       metadataIndexEntryStack.push(
@@ -68,6 +72,11 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
     }
   }
 
+  public TsFileSequenceReaderTimeseriesMetadataIterator(
+      TsFileSequenceReader reader, boolean needChunkMetadata) throws 
IOException {
+    this(reader, needChunkMetadata, DEFAULT_TIMESERIES_BATCH_READ_NUMBER);
+  }
+
   @Override
   public boolean hasNext() {
     return !metadataIndexEntryStack.isEmpty()
@@ -82,7 +91,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
 
     final Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new 
HashMap<>();
 
-    while (currentTimeseriesMetadataCount < MAX_TIMESERIES_METADATA_COUNT) {
+    while (currentTimeseriesMetadataCount < timeseriesBatchReadNumber) {
       // 1. Check Buffer
       // currentTimeseriesMetadataCount has reached the limit in the previous
       // loop and maybe there is still some data that remains in the buffer.
@@ -90,9 +99,22 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
         timeseriesMetadataMap
             .computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
             .addAll(deserializeTimeseriesMetadata());
+      } else if (currentEndOffset > Long.MIN_VALUE) {
+        try {
+          timeseriesMetadataMap
+              .computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
+              
.addAll(deserializeTimeseriesMetadataUsingTsFileInput(currentEndOffset));
+        } catch (IOException e) {
+          throw new TsFileSequenceReaderTimeseriesMetadataIteratorException(
+              String.format(
+                  "TsFileSequenceReaderTimeseriesMetadataIterator: 
deserializeTimeseriesMetadataUsingTsFileInput failed, "
+                      + "currentEndOffset: %d, "
+                      + e.getMessage(),
+                  currentEndOffset));
+        }
       }
 
-      if (currentTimeseriesMetadataCount >= MAX_TIMESERIES_METADATA_COUNT
+      if (currentTimeseriesMetadataCount >= timeseriesBatchReadNumber
           || metadataIndexEntryStack.isEmpty()) {
         break;
       }
@@ -113,7 +135,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
     }
 
     // 3. Reset currentTimeseriesMetadataCount
-    if (currentTimeseriesMetadataCount >= MAX_TIMESERIES_METADATA_COUNT) {
+    if (currentTimeseriesMetadataCount >= timeseriesBatchReadNumber) {
       currentTimeseriesMetadataCount = 0;
     }
 
@@ -157,6 +179,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
           .computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
           .addAll(deserializeTimeseriesMetadata());
     } else {
+      currentEndOffset = endOffset;
       reader.position(metadataIndexEntry.getOffset());
       timeseriesMetadataMap
           .computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
@@ -167,7 +190,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
   private List<TimeseriesMetadata> deserializeTimeseriesMetadata() {
     final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
     while (currentBuffer.hasRemaining()
-        && currentTimeseriesMetadataCount < MAX_TIMESERIES_METADATA_COUNT) {
+        && currentTimeseriesMetadataCount < timeseriesBatchReadNumber) {
       timeseriesMetadataList.add(
           TimeseriesMetadata.deserializeFrom(currentBuffer, 
needChunkMetadata));
       currentTimeseriesMetadataCount++;
@@ -179,11 +202,14 @@ public class 
TsFileSequenceReaderTimeseriesMetadataIterator
       throws IOException {
     final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
     while (reader.position() < endOffset
-        && currentTimeseriesMetadataCount < MAX_TIMESERIES_METADATA_COUNT) {
+        && currentTimeseriesMetadataCount < 
DEFAULT_TIMESERIES_BATCH_READ_NUMBER) {
       timeseriesMetadataList.add(
           TimeseriesMetadata.deserializeFrom(reader.tsFileInput, 
needChunkMetadata));
       currentTimeseriesMetadataCount++;
     }
+    if (reader.position() >= endOffset) {
+      currentEndOffset = Long.MIN_VALUE;
+    }
     return timeseriesMetadataList;
   }
 
diff --git 
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java
 
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java
index 54ddaabf03c..5648473662b 100644
--- 
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java
+++ 
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java
@@ -36,8 +36,8 @@ public class 
TsFileSequenceReaderTimeseriesMetadataIteratorTest {
 
   @Before
   public void before() throws IOException {
-    // create 2020 timeseries, 101 measurements per device.
-    FileGenerator.generateFile(100, 20, 101);
+    // create 4040 timeseries, 101 measurements per device.
+    FileGenerator.generateFile(100, 40, 101);
     TsFileSequenceReader fileReader = new TsFileSequenceReader(FILE_PATH);
     tsFile = new TsFileReader(fileReader);
   }
@@ -55,8 +55,8 @@ public class 
TsFileSequenceReaderTimeseriesMetadataIteratorTest {
         new TsFileSequenceReaderTimeseriesMetadataIterator(fileReader, false);
 
     Assert.assertTrue(iterator.hasNext());
-    Assert.assertEquals(2000, 
iterator.next().values().stream().mapToLong(List::size).sum());
+    Assert.assertEquals(4000, 
iterator.next().values().stream().mapToLong(List::size).sum());
     Assert.assertTrue(iterator.hasNext());
-    Assert.assertEquals(20, 
iterator.next().values().stream().mapToLong(List::size).sum());
+    Assert.assertEquals(40, 
iterator.next().values().stream().mapToLong(List::size).sum());
   }
 }


Reply via email to