This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 926a086680c [To dev/1.3] Load: Flush ChunkGroup to a temporary file 
and cache TableSchema in advance to reduce memory allocation && Add IT test 
about Load file across multiple time partitions (#16225) (#16253) (#16273)
926a086680c is described below

commit 926a086680c805b4d83c052bd7735a50648faaf9
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Aug 27 14:15:06 2025 +0800

    [To dev/1.3] Load: Flush ChunkGroup to a temporary file and cache 
TableSchema in advance to reduce memory allocation && Add IT test about Load 
file across multiple time partitions (#16225) (#16253) (#16273)
    
    * Load: Flush ChunkGroup to a temporary file and cache TableSchema in 
advance to reduce memory allocation (#16225)
    
    * Load: Flush ChunkGroup to a temporary file and cache TableSchema in 
advance to reduce memory allocation
    
    * update
    
    * update
    
    * update
    
    * update
    
    * fix
    
    * fix
    
    * update
    
    * update
    
    (cherry picked from commit 2a46823cd90914010c9632676476e149b124ba32)
    
    * Load: Add IT test about Load file across multiple time partitions (#16253)
    
    * add IT
    
    * add IT
    
    (cherry picked from commit 67c450be53f83201c98569f4e75e45d5fa3072aa)
    
    # Conflicts:
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
    
    * fix
    
    * fix
---
 .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java  | 83 ++++++++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  7 ++
 .../db/storageengine/load/LoadTsFileManager.java   | 48 +++++++++++--
 .../load/splitter/AlignedChunkData.java            |  7 +-
 .../load/splitter/NonAlignedChunkData.java         |  4 ++
 6 files changed, 152 insertions(+), 7 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 12247e277ab..71b17800163 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -222,6 +222,9 @@ public class IoTDBLoadTsFileIT {
       generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL 
/ 10_000, false);
       generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL 
/ 10_000, false);
       generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL 
/ 10_000, true);
+      for (int i = 0; i < 10000; i++) {
+        generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 
10, true);
+      }
       writtenPoint2 = generator.getTotalNumber();
     }
 
@@ -253,6 +256,86 @@ public class IoTDBLoadTsFileIT {
     }
   }
 
+  @Test
+  public void testLoadAcrossMultipleTimePartitions() throws Exception {
+    registerSchema();
+
+    final long writtenPoint1;
+    // device 0, device 1, sg 0
+    try (final TsFileGenerator generator =
+        new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
+      generator.registerTimeseries(
+          SchemaConfig.DEVICE_0,
+          Arrays.asList(
+              SchemaConfig.MEASUREMENT_00,
+              SchemaConfig.MEASUREMENT_01,
+              SchemaConfig.MEASUREMENT_02,
+              SchemaConfig.MEASUREMENT_03,
+              SchemaConfig.MEASUREMENT_04,
+              SchemaConfig.MEASUREMENT_05,
+              SchemaConfig.MEASUREMENT_06,
+              SchemaConfig.MEASUREMENT_07));
+      generator.registerAlignedTimeseries(
+          SchemaConfig.DEVICE_1,
+          Arrays.asList(
+              SchemaConfig.MEASUREMENT_10,
+              SchemaConfig.MEASUREMENT_11,
+              SchemaConfig.MEASUREMENT_12,
+              SchemaConfig.MEASUREMENT_13,
+              SchemaConfig.MEASUREMENT_14,
+              SchemaConfig.MEASUREMENT_15,
+              SchemaConfig.MEASUREMENT_16,
+              SchemaConfig.MEASUREMENT_17));
+      for (int i = 0; i < 1000; i++) {
+        generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL - 
10, false);
+      }
+      for (int i = 0; i < 1000; i++) {
+        generator.generateData(SchemaConfig.DEVICE_1, 1, PARTITION_INTERVAL - 
10, true);
+      }
+      writtenPoint1 = generator.getTotalNumber();
+    }
+
+    final long writtenPoint2;
+    // device 2, device 3, device4, sg 1
+    try (final TsFileGenerator generator =
+        new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
+      generator.registerTimeseries(
+          SchemaConfig.DEVICE_2, 
Collections.singletonList(SchemaConfig.MEASUREMENT_20));
+      generator.registerTimeseries(
+          SchemaConfig.DEVICE_3, 
Collections.singletonList(SchemaConfig.MEASUREMENT_30));
+      generator.registerAlignedTimeseries(
+          SchemaConfig.DEVICE_4, 
Collections.singletonList(SchemaConfig.MEASUREMENT_40));
+      for (int i = 0; i < 1000; i++) {
+        generator.generateData(SchemaConfig.DEVICE_2, 1, PARTITION_INTERVAL - 
10, false);
+      }
+      for (int i = 0; i < 1000; i++) {
+        generator.generateData(SchemaConfig.DEVICE_3, 1, PARTITION_INTERVAL - 
10, false);
+      }
+      for (int i = 0; i < 1000; i++) {
+        generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 
10, true);
+      }
+      writtenPoint2 = generator.getTotalNumber();
+    }
+
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement()) {
+
+      statement.execute(String.format("load \"%s\" sglevel=2", 
tmpDir.getAbsolutePath()));
+
+      try (final ResultSet resultSet =
+          statement.executeQuery("select count(*) from root.sg.** group by 
level=1,2")) {
+        if (resultSet.next()) {
+          long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
+          Assert.assertEquals(writtenPoint1, sg1Count);
+          long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
+          Assert.assertEquals(writtenPoint2, sg2Count);
+        } else {
+          Assert.fail("This ResultSet is empty.");
+        }
+      }
+    }
+  }
+
   @Test
   public void testLoadWithExtendTemplate() throws Exception {
     final long writtenPoint1;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1a3f87a1a83..024debf4bf9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1166,6 +1166,8 @@ public class IoTDBConfig {
 
   private boolean loadActiveListeningEnable = true;
 
+  private long loadMeasurementIdCacheSizeInBytes = 2 * 1024 * 1024L; // 2MB
+
   private String[] loadActiveListeningDirs =
       new String[] {
         IoTDBConstant.EXT_FOLDER_NAME
@@ -4185,6 +4187,14 @@ public class IoTDBConfig {
     this.loadActiveListeningEnable = loadActiveListeningEnable;
   }
 
+  public long getLoadMeasurementIdCacheSizeInBytes() {
+    return loadMeasurementIdCacheSizeInBytes;
+  }
+
+  public void setLoadMeasurementIdCacheSizeInBytes(long 
loadMeasurementIdCacheSizeInBytes) {
+    this.loadMeasurementIdCacheSizeInBytes = loadMeasurementIdCacheSizeInBytes;
+  }
+
   public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
     this.pipeReceiverFileDirs = pipeReceiverFileDirs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b8b29fad99c..e031fe959ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2498,6 +2498,12 @@ public class IoTDBDescriptor {
             ? conf.getLoadActiveListeningCheckIntervalSeconds()
             : loadActiveListeningCheckIntervalSeconds);
 
+    conf.setLoadMeasurementIdCacheSizeInBytes(
+        Long.parseLong(
+            properties.getProperty(
+                "load_measurement_id_cache_size_in_bytes",
+                Long.toString(conf.getLoadMeasurementIdCacheSizeInBytes()))));
+
     conf.setLoadActiveListeningMaxThreadNum(
         Integer.parseInt(
             properties.getProperty(
@@ -2562,6 +2568,7 @@ public class IoTDBDescriptor {
                 "load_active_listening_enable",
                 ConfigurationFileUtils.getConfigurationDefaultValue(
                     "load_active_listening_enable"))));
+
     conf.setLoadActiveListeningDirs(
         Arrays.stream(
                 properties
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 58d10a87697..63a72be4af9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -52,6 +52,8 @@ import 
org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.PageException;
@@ -74,10 +76,12 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -105,6 +109,12 @@ public class LoadTsFileManager {
       new AtomicReference<>(CONFIG.getLoadTsFileDirs());
   private static final AtomicReference<FolderManager> FOLDER_MANAGER = new 
AtomicReference<>();
 
+  public static final Cache<String, String> MEASUREMENT_ID_CACHE =
+      Caffeine.newBuilder()
+          .maximumWeight(CONFIG.getLoadMeasurementIdCacheSizeInBytes())
+          .weigher((String k, String v) -> v.length())
+          .build();
+
   private final Map<String, TsFileWriterManager> uuid2WriterManager = new 
ConcurrentHashMap<>();
 
   private final Map<String, CleanupTask> uuid2CleanupTask = new 
ConcurrentHashMap<>();
@@ -376,6 +386,7 @@ public class LoadTsFileManager {
     private Map<DataPartitionInfo, TsFileResource> dataPartition2Resource;
     private Map<DataPartitionInfo, String> dataPartition2LastDevice;
     private Map<DataPartitionInfo, ModificationFile> 
dataPartition2ModificationFile;
+    private Map<String, Set<DataPartitionInfo>> device2Partition;
     private boolean isClosed;
 
     private TsFileWriterManager(File taskDir) {
@@ -384,6 +395,7 @@ public class LoadTsFileManager {
       this.dataPartition2Resource = new HashMap<>();
       this.dataPartition2LastDevice = new HashMap<>();
       this.dataPartition2ModificationFile = new HashMap<>();
+      device2Partition = new HashMap<>();
       this.isClosed = false;
 
       clearDir(taskDir);
@@ -446,14 +458,36 @@ public class LoadTsFileManager {
         dataPartition2Resource.put(partitionInfo, resource);
       }
       TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
-      if 
(!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo,
 ""))) {
-        if (dataPartition2LastDevice.containsKey(partitionInfo)) {
-          writer.endChunkGroup();
-          writer.checkMetadataSizeAndMayFlush();
+
+      String device = chunkData.getDevice();
+      String lastDevice = dataPartition2LastDevice.get(partitionInfo);
+
+      if (!Objects.equals(device, lastDevice)) {
+        if (lastDevice != null && device2Partition.containsKey(lastDevice)) {
+          Set<DataPartitionInfo> partitions = device2Partition.get(lastDevice);
+          for (DataPartitionInfo partition : partitions) {
+            TsFileIOWriter w = dataPartition2Writer.get(partition);
+            if (dataPartition2LastDevice.containsKey(partition) && w != null) {
+              w.endChunkGroup();
+              w.checkMetadataSizeAndMayFlush();
+            }
+          }
+          device2Partition.remove(lastDevice);
         }
-        writer.startChunkGroup(new PlainDeviceID(chunkData.getDevice()));
-        dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
+
+        if (writer.isWritingChunkGroup()) {
+          LOGGER.warn(
+              "Writer {} for partition {} is already writing chunk group for 
device {}, but the last device is {}. ",
+              writer.getFile().getAbsolutePath(),
+              partitionInfo,
+              device,
+              lastDevice);
+        }
+        writer.startChunkGroup(new PlainDeviceID(device));
+        dataPartition2LastDevice.put(partitionInfo, device);
+        device2Partition.computeIfAbsent(device, k -> new 
HashSet<>()).add(partitionInfo);
       }
+
       chunkData.writeToFileWriter(writer);
     }
 
@@ -672,8 +706,10 @@ public class LoadTsFileManager {
         LOGGER.warn(MESSAGE_DELETE_FAIL, taskDir.getPath(), e);
       }
       dataPartition2Writer = null;
+      dataPartition2Resource = null;
       dataPartition2LastDevice = null;
       dataPartition2ModificationFile = null;
+      device2Partition = null;
       isClosed = true;
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index d898563fcf2..86bd02c9947 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -54,6 +54,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import static 
org.apache.iotdb.db.storageengine.load.LoadTsFileManager.MEASUREMENT_ID_CACHE;
+
 public class AlignedChunkData implements ChunkData {
   protected static final int DEFAULT_INT32 = 0;
   protected static final long DEFAULT_INT64 = 0L;
@@ -447,7 +449,10 @@ public class AlignedChunkData implements ChunkData {
     final List<ChunkHeader> chunkHeaderList = new ArrayList<>();
     for (int i = 0; i < chunkHeaderListSize; i++) {
       final byte chunkType = ReadWriteIOUtils.readByte(stream);
-      chunkHeaderList.add(ChunkHeader.deserializeFrom(stream, chunkType));
+      ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream, chunkType);
+      String measurementID = chunkHeader.getMeasurementID();
+      chunkHeader.setMeasurementID(MEASUREMENT_ID_CACHE.get(measurementID, m 
-> m));
+      chunkHeaderList.add(chunkHeader);
     }
     final List<Integer> pageNumbers = new ArrayList<>();
     if (needDecodeChunk) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
index a2b6577d327..2d8a75052ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
@@ -44,6 +44,8 @@ import java.io.InputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 
+import static 
org.apache.iotdb.db.storageengine.load.LoadTsFileManager.MEASUREMENT_ID_CACHE;
+
 public class NonAlignedChunkData implements ChunkData {
 
   private final TTimePartitionSlot timePartitionSlot;
@@ -284,6 +286,8 @@ public class NonAlignedChunkData implements ChunkData {
     final boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
     final byte chunkType = ReadWriteIOUtils.readByte(stream);
     final ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream, 
chunkType);
+    String measurementID = chunkHeader.getMeasurementID();
+    chunkHeader.setMeasurementID(MEASUREMENT_ID_CACHE.get(measurementID, m -> 
m));
     int pageNumber = 0;
     if (needDecodeChunk) {
       pageNumber = ReadWriteIOUtils.readInt(stream);

Reply via email to