This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 699c90eff9d Load: add the function of transferring too many time
partitions of files to tablets and fixed the problem that the data written to
tablets is more than expected. (#16320)
699c90eff9d is described below
commit 699c90eff9d1c4969935dc94a2b4479e0d0e5d61
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Sep 11 10:23:30 2025 +0800
Load: add the function of transferring too many time partitions of files to
tablets and fixed the problem that the data written to tablets is more than
expected. (#16320)
* update
* update
* update
* update
* update
* update
* simplify
* Update pom.xml
* fix
* fix
* fix IT
---------
Co-authored-by: Caideyipi <[email protected]>
---
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 5 ++++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 23 ++++++++++++++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++++++
...ileInsertionEventTableParserTabletIterator.java | 25 ++++++++++++++++++++--
.../planner/plan/node/write/InsertTabletNode.java | 4 ++--
.../plan/statement/crud/InsertTabletStatement.java | 18 ++++++++++------
.../converter/LoadTsFileDataTypeConverter.java | 11 ++++++----
.../load/splitter/TsFileSplitter.java | 21 ++++++++++++++++++
.../planner/node/write/WritePlanNodeSplitTest.java | 3 +++
9 files changed, 101 insertions(+), 15 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 7d5f7be8a1e..c838bc843ae 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -83,11 +83,14 @@ public class IoTDBLoadTsFileIT {
tmpDir = new File(Files.createTempDirectory("load").toUri());
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:1");
EnvFactory.getEnv()
.getConfig()
.getDataNodeConfig()
.setConnectionTimeoutInMS(connectionTimeoutInMS)
.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes);
+
EnvFactory.getEnv().initClusterEnvironment();
}
@@ -224,7 +227,7 @@ public class IoTDBLoadTsFileIT {
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL
/ 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL
/ 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL
/ 10_000, true);
- for (int i = 0; i < 10000; i++) {
+ for (int i = 0; i < 1000; i++) {
generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL -
10, true);
}
writtenPoint2 = generator.getTotalNumber();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ecb18b3c575..55c5410a555 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1108,6 +1108,8 @@ public class IoTDBConfig {
private long loadMeasurementIdCacheSizeInBytes = 2 * 1024 * 1024L; // 2MB
+ private int loadTsFileSpiltPartitionMaxSize = 10;
+
private String[] loadActiveListeningDirs =
new String[] {
IoTDBConstant.EXT_FOLDER_NAME
@@ -3997,6 +3999,27 @@ public class IoTDBConfig {
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
}
+ public int getLoadTsFileSpiltPartitionMaxSize() {
+ return loadTsFileSpiltPartitionMaxSize;
+ }
+
+ public void setLoadTsFileSpiltPartitionMaxSize(int
loadTsFileSpiltPartitionMaxSize) {
+ if (loadTsFileSpiltPartitionMaxSize <= 0) {
+ throw new IllegalArgumentException(
+ "loadTsFileSpiltPartitionMaxSize should be greater than or equal to
0");
+ }
+
+ if (this.loadTsFileSpiltPartitionMaxSize ==
loadTsFileSpiltPartitionMaxSize) {
+ return;
+ }
+
+ logger.info(
+ "Set loadTsFileSpiltPartitionMaxSize from {} to {}",
+ this.loadTsFileSpiltPartitionMaxSize,
+ loadTsFileSpiltPartitionMaxSize);
+ this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize;
+ }
+
public String[] getPipeReceiverFileDirs() {
return (Objects.isNull(this.pipeReceiverFileDirs) ||
this.pipeReceiverFileDirs.length == 0)
? new String[] {systemDir + File.separator + "pipe" + File.separator +
"receiver"}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8dc51d1964f..2aa679e6db8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2404,6 +2404,12 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_active_listening_fail_dir",
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
+
+ conf.setLoadTsFileSpiltPartitionMaxSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "load_tsfile_split_partition_max_size",
+ Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize()))));
}
private void loadPipeHotModifiedProp(TrimProperties properties) throws
IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index 1665a8b3e0e..746d9d5b4a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -40,6 +40,7 @@ import org.apache.tsfile.read.controller.IMetadataQuerier;
import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.tsfile.read.reader.IChunkReader;
import org.apache.tsfile.read.reader.chunk.TableChunkReader;
+import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
@@ -394,6 +395,13 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
if (primitiveType == null) {
+ switch (dataTypeList.get(i)) {
+ case TEXT:
+ case BLOB:
+ case STRING:
+ tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues());
+ }
+ tablet.getBitMaps()[i].mark(rowIndex);
continue;
}
@@ -420,7 +428,11 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
case TEXT:
case BLOB:
case STRING:
- tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues());
+ Binary binary = primitiveType.getBinary();
+ tablet.addValue(
+ rowIndex,
+ i,
+ binary.getValues() == null ? Binary.EMPTY_VALUE.getValues() :
binary.getValues());
break;
default:
throw new UnSupportedDataTypeException("UnSupported" +
primitiveType.getDataType());
@@ -431,11 +443,20 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
private void fillDeviceIdColumns(
final IDeviceID deviceID, final Tablet tablet, final int rowIndex) {
final String[] deviceIdSegments = (String[]) deviceID.getSegments();
- for (int i = 1, totalColumns = deviceIdSegments.length; i < totalColumns;
i++) {
+ int i = 1;
+ for (int totalColumns = deviceIdSegments.length; i < totalColumns; i++) {
if (deviceIdSegments[i] == null) {
+ tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
+ tablet.getBitMaps()[i - 1].mark(rowIndex);
continue;
}
tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]);
}
+
+ while (i <= deviceIdSize) {
+ tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
+ tablet.getBitMaps()[i - 1].mark(rowIndex);
+ i++;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index be0b00931fd..edbc262971a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -231,7 +231,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = new
LinkedHashMap<>();
- for (int i = 1; i < times.length; i++) { // times are sorted in session
API.
+ for (int i = 1; i < rowCount; i++) { // times are sorted in session API.
IDeviceID nextDeviceId = getDeviceID(i);
if (times[i] >= upperBoundOfTimePartition ||
!currDeviceId.equals(nextDeviceId)) {
final PartitionSplitInfo splitInfo =
@@ -253,7 +253,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId, deviceID1 -> new
PartitionSplitInfo());
// the final range
splitInfo.ranges.add(startLoc); // included
- splitInfo.ranges.add(times.length); // excluded
+ splitInfo.ranges.add(rowCount); // excluded
splitInfo.timePartitionSlots.add(timePartitionSlot);
return deviceIDSplitInfoMap;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index e7eafc4df72..4c255e0882f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -124,12 +124,18 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
private Object convertTableColumn(final Object input) {
- return input instanceof LocalDate[]
- ? Arrays.stream(((LocalDate[]) input))
- .map(date -> Objects.nonNull(date) ?
DateUtils.parseDateExpressionToInt(date) : 0)
- .mapToInt(Integer::intValue)
- .toArray()
- : input;
+ if (input instanceof LocalDate[]) {
+ return Arrays.stream(((LocalDate[]) input))
+ .map(date -> Objects.nonNull(date) ?
DateUtils.parseDateExpressionToInt(date) : 0)
+ .mapToInt(Integer::intValue)
+ .toArray();
+ } else if (input instanceof Binary[]) {
+ return Arrays.stream(((Binary[]) input))
+ .map(binary -> Objects.nonNull(binary) ? binary : Binary.EMPTY_VALUE)
+ .toArray(Binary[]::new);
+ }
+
+ return input;
}
public InsertTabletStatement(InsertTabletNode node) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index 59ff67f902d..62deb10b368 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -62,16 +62,19 @@ public class LoadTsFileDataTypeConverter {
private final SqlParser relationalSqlParser = new SqlParser();
private final LoadTableStatementDataTypeConvertExecutionVisitor
- tableStatementDataTypeConvertExecutionVisitor =
- new
LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel);
+ tableStatementDataTypeConvertExecutionVisitor;
private final LoadTreeStatementDataTypeConvertExecutionVisitor
- treeStatementDataTypeConvertExecutionVisitor =
- new
LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
+ treeStatementDataTypeConvertExecutionVisitor;
public LoadTsFileDataTypeConverter(
final MPPQueryContext context, final boolean isGeneratedByPipe) {
this.context = context;
this.isGeneratedByPipe = isGeneratedByPipe;
+
+ tableStatementDataTypeConvertExecutionVisitor =
+ new
LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel);
+ treeStatementDataTypeConvertExecutionVisitor =
+ new
LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
}
public Optional<TSStatus> convertForTableModel(final LoadTsFile
loadTsFileTableStatement) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
index d194919c18d..5a75f4fb8e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -62,6 +64,8 @@ import java.util.Set;
public class TsFileSplitter {
private static final Logger logger =
LoggerFactory.getLogger(TsFileSplitter.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
private final File tsFile;
private final TsFileDataConsumer consumer;
private Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>();
@@ -72,6 +76,7 @@ public class TsFileSplitter {
private IDeviceID curDevice = null;
private boolean isAligned;
private int timeChunkIndexOfCurrentValueColumn = 0;
+ private Set<TTimePartitionSlot> timePartitionSlots = new HashSet<>();
// Maintain the number of times the chunk of each measurement appears.
private Map<String, Integer> valueColumn2TimeChunkIndex = new HashMap<>();
@@ -445,6 +450,14 @@ public class TsFileSplitter {
}
}
for (AlignedChunkData chunkData : chunkDataMap.keySet()) {
+ timePartitionSlots.add(chunkData.getTimePartitionSlot());
+ if (deletions.isEmpty()
+ && timePartitionSlots.size() >
CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
+ throw new LoadFileException(
+ String.format(
+ "Time partition slots size is greater than %s",
+ CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
+ }
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
throw new IllegalStateException(
String.format(
@@ -457,6 +470,14 @@ public class TsFileSplitter {
private void consumeChunkData(String measurement, long offset, ChunkData
chunkData)
throws LoadFileException {
+ timePartitionSlots.add(chunkData.getTimePartitionSlot());
+ if (deletions.isEmpty()
+ && timePartitionSlots.size() >
CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
+ throw new LoadFileException(
+ String.format(
+ "Time partition slots size is greater than %s",
+ CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
+ }
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
throw new IllegalStateException(
String.format(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
index 807186c26da..8d23bc848b9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
@@ -204,6 +204,7 @@ public class WritePlanNodeSplitTest {
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(
new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90,
100}});
+ insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
dataPartitionQueryParam.setDeviceID(
@@ -314,6 +315,7 @@ public class WritePlanNodeSplitTest {
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(
new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
+ insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);
insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 3"));
@@ -322,6 +324,7 @@ public class WritePlanNodeSplitTest {
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(
new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
+ insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);
}