This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 926a086680c [To dev/1.3] Load: Flush ChunkGroup to a temporary file
and cache TableSchema in advance to reduce memory allocation && Add IT test
about Load file across multiple time partitions (#16225) (#16253) (#16273)
926a086680c is described below
commit 926a086680c805b4d83c052bd7735a50648faaf9
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Aug 27 14:15:06 2025 +0800
[To dev/1.3] Load: Flush ChunkGroup to a temporary file and cache
TableSchema in advance to reduce memory allocation && Add IT test about Load
file across multiple time partitions (#16225) (#16253) (#16273)
* Load: Flush ChunkGroup to a temporary file and cache TableSchema in
advance to reduce memory allocation (#16225)
* Load: Flush ChunkGroup to a temporary file and cache TableSchema in
advance to reduce memory allocation
* update
* update
* update
* update
* fix
* fix
* update
* update
(cherry picked from commit 2a46823cd90914010c9632676476e149b124ba32)
* Load: Add IT test about Load file across multiple time partitions (#16253)
* add IT
* add IT
(cherry picked from commit 67c450be53f83201c98569f4e75e45d5fa3072aa)
# Conflicts:
#
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
#
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
* fix
* fix
---
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 83 ++++++++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 ++
.../db/storageengine/load/LoadTsFileManager.java | 48 +++++++++++--
.../load/splitter/AlignedChunkData.java | 7 +-
.../load/splitter/NonAlignedChunkData.java | 4 ++
6 files changed, 152 insertions(+), 7 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 12247e277ab..71b17800163 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -222,6 +222,9 @@ public class IoTDBLoadTsFileIT {
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL
/ 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL
/ 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL
/ 10_000, true);
+ for (int i = 0; i < 10000; i++) {
+ generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL -
10, true);
+ }
writtenPoint2 = generator.getTotalNumber();
}
@@ -253,6 +256,86 @@ public class IoTDBLoadTsFileIT {
}
}
+ @Test
+ public void testLoadAcrossMultipleTimePartitions() throws Exception {
+ registerSchema();
+
+ final long writtenPoint1;
+ // device 0, device 1, sg 0
+ try (final TsFileGenerator generator =
+ new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_0,
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_00,
+ SchemaConfig.MEASUREMENT_01,
+ SchemaConfig.MEASUREMENT_02,
+ SchemaConfig.MEASUREMENT_03,
+ SchemaConfig.MEASUREMENT_04,
+ SchemaConfig.MEASUREMENT_05,
+ SchemaConfig.MEASUREMENT_06,
+ SchemaConfig.MEASUREMENT_07));
+ generator.registerAlignedTimeseries(
+ SchemaConfig.DEVICE_1,
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_10,
+ SchemaConfig.MEASUREMENT_11,
+ SchemaConfig.MEASUREMENT_12,
+ SchemaConfig.MEASUREMENT_13,
+ SchemaConfig.MEASUREMENT_14,
+ SchemaConfig.MEASUREMENT_15,
+ SchemaConfig.MEASUREMENT_16,
+ SchemaConfig.MEASUREMENT_17));
+ for (int i = 0; i < 1000; i++) {
+ generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL -
10, false);
+ }
+ for (int i = 0; i < 1000; i++) {
+ generator.generateData(SchemaConfig.DEVICE_1, 1, PARTITION_INTERVAL -
10, true);
+ }
+ writtenPoint1 = generator.getTotalNumber();
+ }
+
+ final long writtenPoint2;
+ // device 2, device 3, device4, sg 1
+ try (final TsFileGenerator generator =
+ new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_2,
Collections.singletonList(SchemaConfig.MEASUREMENT_20));
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_3,
Collections.singletonList(SchemaConfig.MEASUREMENT_30));
+ generator.registerAlignedTimeseries(
+ SchemaConfig.DEVICE_4,
Collections.singletonList(SchemaConfig.MEASUREMENT_40));
+ for (int i = 0; i < 1000; i++) {
+ generator.generateData(SchemaConfig.DEVICE_2, 1, PARTITION_INTERVAL -
10, false);
+ }
+ for (int i = 0; i < 1000; i++) {
+ generator.generateData(SchemaConfig.DEVICE_3, 1, PARTITION_INTERVAL -
10, false);
+ }
+ for (int i = 0; i < 1000; i++) {
+ generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL -
10, true);
+ }
+ writtenPoint2 = generator.getTotalNumber();
+ }
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ statement.execute(String.format("load \"%s\" sglevel=2",
tmpDir.getAbsolutePath()));
+
+ try (final ResultSet resultSet =
+ statement.executeQuery("select count(*) from root.sg.** group by
level=1,2")) {
+ if (resultSet.next()) {
+ long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
+ Assert.assertEquals(writtenPoint1, sg1Count);
+ long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
+ Assert.assertEquals(writtenPoint2, sg2Count);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+ }
+
@Test
public void testLoadWithExtendTemplate() throws Exception {
final long writtenPoint1;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1a3f87a1a83..024debf4bf9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1166,6 +1166,8 @@ public class IoTDBConfig {
private boolean loadActiveListeningEnable = true;
+ private long loadMeasurementIdCacheSizeInBytes = 2 * 1024 * 1024L; // 2MB
+
private String[] loadActiveListeningDirs =
new String[] {
IoTDBConstant.EXT_FOLDER_NAME
@@ -4185,6 +4187,14 @@ public class IoTDBConfig {
this.loadActiveListeningEnable = loadActiveListeningEnable;
}
+ public long getLoadMeasurementIdCacheSizeInBytes() {
+ return loadMeasurementIdCacheSizeInBytes;
+ }
+
+ public void setLoadMeasurementIdCacheSizeInBytes(long
loadMeasurementIdCacheSizeInBytes) {
+ this.loadMeasurementIdCacheSizeInBytes = loadMeasurementIdCacheSizeInBytes;
+ }
+
public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b8b29fad99c..e031fe959ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2498,6 +2498,12 @@ public class IoTDBDescriptor {
? conf.getLoadActiveListeningCheckIntervalSeconds()
: loadActiveListeningCheckIntervalSeconds);
+ conf.setLoadMeasurementIdCacheSizeInBytes(
+ Long.parseLong(
+ properties.getProperty(
+ "load_measurement_id_cache_size_in_bytes",
+ Long.toString(conf.getLoadMeasurementIdCacheSizeInBytes()))));
+
conf.setLoadActiveListeningMaxThreadNum(
Integer.parseInt(
properties.getProperty(
@@ -2562,6 +2568,7 @@ public class IoTDBDescriptor {
"load_active_listening_enable",
ConfigurationFileUtils.getConfigurationDefaultValue(
"load_active_listening_enable"))));
+
conf.setLoadActiveListeningDirs(
Arrays.stream(
properties
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 58d10a87697..63a72be4af9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -52,6 +52,8 @@ import
org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.metrics.utils.MetricLevel;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.PageException;
@@ -74,10 +76,12 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -105,6 +109,12 @@ public class LoadTsFileManager {
new AtomicReference<>(CONFIG.getLoadTsFileDirs());
private static final AtomicReference<FolderManager> FOLDER_MANAGER = new
AtomicReference<>();
+ public static final Cache<String, String> MEASUREMENT_ID_CACHE =
+ Caffeine.newBuilder()
+ .maximumWeight(CONFIG.getLoadMeasurementIdCacheSizeInBytes())
+ .weigher((String k, String v) -> v.length())
+ .build();
+
private final Map<String, TsFileWriterManager> uuid2WriterManager = new
ConcurrentHashMap<>();
private final Map<String, CleanupTask> uuid2CleanupTask = new
ConcurrentHashMap<>();
@@ -376,6 +386,7 @@ public class LoadTsFileManager {
private Map<DataPartitionInfo, TsFileResource> dataPartition2Resource;
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
private Map<DataPartitionInfo, ModificationFile>
dataPartition2ModificationFile;
+ private Map<String, Set<DataPartitionInfo>> device2Partition;
private boolean isClosed;
private TsFileWriterManager(File taskDir) {
@@ -384,6 +395,7 @@ public class LoadTsFileManager {
this.dataPartition2Resource = new HashMap<>();
this.dataPartition2LastDevice = new HashMap<>();
this.dataPartition2ModificationFile = new HashMap<>();
+ device2Partition = new HashMap<>();
this.isClosed = false;
clearDir(taskDir);
@@ -446,14 +458,36 @@ public class LoadTsFileManager {
dataPartition2Resource.put(partitionInfo, resource);
}
TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
- if
(!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo,
""))) {
- if (dataPartition2LastDevice.containsKey(partitionInfo)) {
- writer.endChunkGroup();
- writer.checkMetadataSizeAndMayFlush();
+
+ String device = chunkData.getDevice();
+ String lastDevice = dataPartition2LastDevice.get(partitionInfo);
+
+ if (!Objects.equals(device, lastDevice)) {
+ if (lastDevice != null && device2Partition.containsKey(lastDevice)) {
+ Set<DataPartitionInfo> partitions = device2Partition.get(lastDevice);
+ for (DataPartitionInfo partition : partitions) {
+ TsFileIOWriter w = dataPartition2Writer.get(partition);
+ if (dataPartition2LastDevice.containsKey(partition) && w != null) {
+ w.endChunkGroup();
+ w.checkMetadataSizeAndMayFlush();
+ }
+ }
+ device2Partition.remove(lastDevice);
}
- writer.startChunkGroup(new PlainDeviceID(chunkData.getDevice()));
- dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
+
+ if (writer.isWritingChunkGroup()) {
+ LOGGER.warn(
+ "Writer {} for partition {} is already writing chunk group for
device {}, but the last device is {}. ",
+ writer.getFile().getAbsolutePath(),
+ partitionInfo,
+ device,
+ lastDevice);
+ }
+ writer.startChunkGroup(new PlainDeviceID(device));
+ dataPartition2LastDevice.put(partitionInfo, device);
+ device2Partition.computeIfAbsent(device, k -> new
HashSet<>()).add(partitionInfo);
}
+
chunkData.writeToFileWriter(writer);
}
@@ -672,8 +706,10 @@ public class LoadTsFileManager {
LOGGER.warn(MESSAGE_DELETE_FAIL, taskDir.getPath(), e);
}
dataPartition2Writer = null;
+ dataPartition2Resource = null;
dataPartition2LastDevice = null;
dataPartition2ModificationFile = null;
+ device2Partition = null;
isClosed = true;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index d898563fcf2..86bd02c9947 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -54,6 +54,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import static
org.apache.iotdb.db.storageengine.load.LoadTsFileManager.MEASUREMENT_ID_CACHE;
+
public class AlignedChunkData implements ChunkData {
protected static final int DEFAULT_INT32 = 0;
protected static final long DEFAULT_INT64 = 0L;
@@ -447,7 +449,10 @@ public class AlignedChunkData implements ChunkData {
final List<ChunkHeader> chunkHeaderList = new ArrayList<>();
for (int i = 0; i < chunkHeaderListSize; i++) {
final byte chunkType = ReadWriteIOUtils.readByte(stream);
- chunkHeaderList.add(ChunkHeader.deserializeFrom(stream, chunkType));
+ ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream, chunkType);
+ String measurementID = chunkHeader.getMeasurementID();
+ chunkHeader.setMeasurementID(MEASUREMENT_ID_CACHE.get(measurementID, m
-> m));
+ chunkHeaderList.add(chunkHeader);
}
final List<Integer> pageNumbers = new ArrayList<>();
if (needDecodeChunk) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
index a2b6577d327..2d8a75052ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
@@ -44,6 +44,8 @@ import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import static
org.apache.iotdb.db.storageengine.load.LoadTsFileManager.MEASUREMENT_ID_CACHE;
+
public class NonAlignedChunkData implements ChunkData {
private final TTimePartitionSlot timePartitionSlot;
@@ -284,6 +286,8 @@ public class NonAlignedChunkData implements ChunkData {
final boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
final byte chunkType = ReadWriteIOUtils.readByte(stream);
final ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream,
chunkType);
+ String measurementID = chunkHeader.getMeasurementID();
+ chunkHeader.setMeasurementID(MEASUREMENT_ID_CACHE.get(measurementID, m ->
m));
int pageNumber = 0;
if (needDecodeChunk) {
pageNumber = ReadWriteIOUtils.readInt(stream);