This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9793441547c Fix that load-tsfile and pipe-parser may skip time-only
aligned chunks (#17625)
9793441547c is described below
commit 9793441547cfb32d62a906b225b02ab4df0ac149
Author: Jiang Tian <[email protected]>
AuthorDate: Tue May 12 09:37:35 2026 +0800
Fix that load-tsfile and pipe-parser may skip time-only aligned chunks
(#17625)
* Fix that load tsfile may skip time-only aligned chunks
* fix LoadTsFileAnalyzer may count timestamp in point count
* Fix tablet parsing
* add test
---
.../apache/iotdb/db/it/IoTDBLoadLastCacheIT.java | 1 -
.../manual/enhanced/IoTDBPipeClusterIT.java | 33 ++++
.../relational/it/db/it/IoTDBLoadTsFileIT.java | 73 ++++++++
...ileInsertionEventTableParserTabletIterator.java | 143 ++++++++++-----
.../plan/analyze/load/LoadTsFileAnalyzer.java | 1 +
.../analyze/load/LoadTsFileTableSchemaCache.java | 24 ++-
.../load/splitter/TsFileSplitter.java | 9 +-
.../plan/analyze/load/LoadTsFileAnalyzerTest.java | 197 +++++++++++++++++++++
.../db/storageengine/load/TsFileSplitterTest.java | 157 ++++++++++++++++
9 files changed, 577 insertions(+), 61 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
index 755b3aef758..4fd1f6fab91 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
@@ -86,7 +86,6 @@ public class IoTDBLoadLastCacheIT {
new Object[][] {
{LastCacheLoadStrategy.CLEAN_ALL},
{LastCacheLoadStrategy.UPDATE},
- {LastCacheLoadStrategy.UPDATE_NO_BLOB},
{LastCacheLoadStrategy.CLEAN_DEVICE}
});
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 4350d4072bb..ad283d4a02c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -49,7 +50,9 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -1001,4 +1004,34 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.assertData("test", "test", -200, 100, receiverEnv,
handleFailure);
}
}
+
+ @Test
+ public void testHistoryDataWithEmptyField() {
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "CREATE DATABASE iot_table_stream_attr",
+ "USE iot_table_stream_attr",
+ "CREATE TABLE table1 (region STRING TAG, device_id STRING TAG,
model_id STRING ATTRIBUTE, maintenance STRING ATTRIBUTE COMMENT 'maintenance',
temperature FLOAT FIELD COMMENT 'temperature', humidity STRING ATTRIBUTE
COMMENT 'humidity', plant_id STRING TAG) COMMENT 'table1'",
+ String.format(
+ "create pipe test with source ('inclusion'='all') with
sink('node-urls'='%s')",
+ receiverEnv.getDataNodeWrapper(0).getIpAndPortString()),
+ "select * from table1 order by time",
+ "INSERT INTO table1(region, plant_id, device_id, model_id,
maintenance, time, temperature, humidity) VALUES ('north', null, 'd101', 'red',
null, '2025-11-26 13:38:00', 91.0, null), (null, '1003', null, null, 'maint-a',
'2025-11-26 13:39:00', null, '36.2'), (null, null, null, 'green', 'maint-b',
'2025-11-26 13:40:00', 88.8, '34.9')",
+ "INSERT INTO table1(region, plant_id, device_id, model_id,
maintenance, time, temperature, humidity) VALUES ('south', '1005', 'd105',
null, null, '2025-11-26 13:41:00', 87.5, null)",
+ "INSERT INTO table1(region, plant_id, device_id, model_id,
maintenance, time, temperature, humidity) VALUES ('west', '1006', 'd106',
'blue', 'maint-c', '2025-11-26 13:42:00', null, '36.8')"),
+ BaseEnv.TABLE_SQL_DIALECT);
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from iot_table_stream_attr.table1 order by time",
+
"time,region,device_id,model_id,maintenance,temperature,humidity,plant_id,",
+ new HashSet<>(
+ Arrays.asList(
+ "2025-11-26T13:38:00.000Z,north,d101,red,null,91.0,null,null,",
+
"2025-11-26T13:39:00.000Z,null,null,null,maint-a,null,36.2,1003,",
+
"2025-11-26T13:40:00.000Z,null,null,green,maint-b,88.8,34.9,null,",
+
"2025-11-26T13:41:00.000Z,south,d105,null,null,87.5,null,1005,",
+
"2025-11-26T13:42:00.000Z,west,d106,blue,maint-c,null,36.8,1006,")),
+ (String) null);
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
index a58e0633b74..b351ace5a4d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
@@ -28,7 +28,10 @@ import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -486,6 +489,76 @@ public class IoTDBLoadTsFileIT {
return tableCreation;
}
+ @Test
+ public void testLoadWithAllFieldsNullRows() throws Exception {
+ final List<IMeasurementSchema> schemas =
+ Arrays.asList(
+ new MeasurementSchema("f1", TSDataType.INT32),
+ new MeasurementSchema("f2", TSDataType.INT64));
+ final List<ColumnCategory> columnCategories =
+ Arrays.asList(ColumnCategory.FIELD, ColumnCategory.FIELD);
+
+ final File file = new File(tmpDir, "1-0-0-0.tsfile");
+ final int totalRows = 20;
+
+ try (final TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+ tsFileWriter.registerTableSchema(
+ new TableSchema(SchemaConfig.TABLE_0, schemas, columnCategories));
+
+ final List<String> columnNames = Arrays.asList("f1", "f2");
+ final List<TSDataType> dataTypes = Arrays.asList(TSDataType.INT32,
TSDataType.INT64);
+ final Tablet tablet =
+ new Tablet(SchemaConfig.TABLE_0, columnNames, dataTypes,
columnCategories);
+
+ for (int r = 0; r < totalRows; r++) {
+ final int row = tablet.getRowSize();
+ tablet.addTimestamp(row, (r + 1) * 1000L);
+ }
+ tsFileWriter.writeTable(tablet);
+ }
+
+ try (final Connection connection =
+ EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute(String.format("create database if not exists %s",
SchemaConfig.DATABASE_0));
+ statement.execute(String.format("use %s", SchemaConfig.DATABASE_0));
+ statement.execute(
+ String.format(
+ "load '%s' with ('database'='%s')", file.getAbsolutePath(),
SchemaConfig.DATABASE_0));
+
+ try (final ResultSet resultSet =
+ statement.executeQuery(String.format("select count(*) from %s",
SchemaConfig.TABLE_0))) {
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(totalRows, resultSet.getLong(1));
+ }
+
+ try (final ResultSet resultSet =
+ statement.executeQuery(
+ String.format("select count(f1), count(f2) from %s",
SchemaConfig.TABLE_0))) {
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(0, resultSet.getLong(1));
+ Assert.assertEquals(0, resultSet.getLong(2));
+ }
+
+ try (final ResultSet resultSet =
+ statement.executeQuery(
+ String.format("select time, f1, f2 from %s order by time",
SchemaConfig.TABLE_0))) {
+ int count = 0;
+ while (resultSet.next()) {
+ final long time = resultSet.getLong("time");
+ final int expectedTime = (count + 1) * 1000;
+ Assert.assertEquals(expectedTime, time);
+ resultSet.getInt("f1");
+ Assert.assertTrue(resultSet.wasNull());
+ resultSet.getLong("f2");
+ Assert.assertTrue(resultSet.wasNull());
+ count++;
+ }
+ Assert.assertEquals(totalRows, count);
+ }
+ }
+ }
+
private static class SchemaConfig {
private static final String DATABASE_0 = "root";
private static final String TABLE_0 = "test";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index f05cf872c79..9d11d51d31a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -101,6 +101,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
private List<ColumnCategory> columnTypes;
private List<String> measurementList;
private List<TSDataType> dataTypeList;
+ private List<IMeasurementSchema> fieldSchemaList;
private int deviceIdSize;
private List<ModsOperationUtil.ModsInfo> modsInfoList;
@@ -194,7 +195,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
long size = 0;
List<AbstractAlignedChunkMetadata> iChunkMetadataList =
- reader.getAlignedChunkMetadata(pair.left, true);
+ reader.getAlignedChunkMetadata(pair.left, false);
Iterator<AbstractAlignedChunkMetadata> chunkMetadataIterator =
iChunkMetadataList.iterator();
@@ -213,27 +214,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
continue;
}
- Iterator<IChunkMetadata> iChunkMetadataIterator =
-
alignedChunkMetadata.getValueChunkMetadataList().iterator();
- while (iChunkMetadataIterator.hasNext()) {
- IChunkMetadata iChunkMetadata =
iChunkMetadataIterator.next();
- if (iChunkMetadata == null) {
- iChunkMetadataIterator.remove();
- continue;
- }
-
- if (!modifications.isEmpty()
- && ModsOperationUtil.isAllDeletedByMods(
- pair.getLeft(),
- iChunkMetadata.getMeasurementUid(),
- alignedChunkMetadata.getStartTime(),
- alignedChunkMetadata.getEndTime(),
- modifications)) {
- iChunkMetadataIterator.remove();
- }
- }
-
- if
(alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) {
+ if (areAllFieldsDeletedByMods(pair.getLeft(),
alignedChunkMetadata)) {
chunkMetadataIterator.remove();
continue;
}
@@ -267,6 +248,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
dataTypeList = new ArrayList<>();
columnTypes = new ArrayList<>();
measurementList = new ArrayList<>();
+ fieldSchemaList = new ArrayList<>();
for (int i = 0; i < columnSchemaSize; i++) {
final IMeasurementSchema schema =
tableSchema.getColumnSchemas().get(i);
@@ -280,6 +262,9 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
measurementList.add(measurementName);
dataTypeList.add(schema.getType());
}
+ if (ColumnCategory.FIELD.equals(columnCategory)) {
+ fieldSchemaList.add(schema);
+ }
}
}
deviceIdSize = dataTypeList.size();
@@ -331,9 +316,9 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
tablet =
new Tablet(
tableName,
- measurementList,
- dataTypeList,
- columnTypes,
+ new ArrayList<>(measurementList),
+ new ArrayList<>(dataTypeList),
+ new ArrayList<>(columnTypes),
rowCountAndMemorySize.getLeft());
tablet.initBitMaps();
isFirstRow = false;
@@ -376,6 +361,20 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
long size = timeChunkSize;
final List<Chunk> valueChunkList = new ArrayList<>();
+ final Map<String, IChunkMetadata> valueChunkMetadataMap =
+ alignedChunkMetadata.getValueChunkMetadataList().stream()
+ .filter(Objects::nonNull)
+ .filter(
+ metadata ->
+ !isFieldDeletedByMods(
+ metadata.getMeasurementUid(),
+ alignedChunkMetadata.getStartTime(),
+ alignedChunkMetadata.getEndTime()))
+ .collect(
+ Collectors.toMap(
+ IChunkMetadata::getMeasurementUid,
+ metadata -> metadata,
+ (left, right) -> left));
// To ensure that the Tablet has the same alignedChunk column as the
current one,
// you need to create a new Tablet to fill in the data.
@@ -392,50 +391,98 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
measurementList.subList(deviceIdSize, measurementList.size()).clear();
dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear();
- for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size();
++offset) {
- final IChunkMetadata metadata =
alignedChunkMetadata.getValueChunkMetadataList().get(offset);
+ boolean hasSelectedField = fieldSchemaList.isEmpty();
+ boolean hasSelectedNonNullChunk = false;
+ for (; offset < fieldSchemaList.size(); ++offset) {
+ final IMeasurementSchema schema = fieldSchemaList.get(offset);
+ if (isFieldDeletedByMods(
+ schema.getMeasurementName(),
+ alignedChunkMetadata.getStartTime(),
+ alignedChunkMetadata.getEndTime())) {
+ continue;
+ }
+
+ final IChunkMetadata metadata =
valueChunkMetadataMap.get(schema.getMeasurementName());
+ Chunk chunk = null;
if (metadata != null) {
- final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
- size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
- if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- if (valueChunkList.isEmpty()) {
+ chunk = reader.readMemChunk((ChunkMetadata) metadata);
+ final long newSize = size +
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
+ if (newSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ if (!hasSelectedNonNullChunk) {
// If the first chunk exceeds the memory limit, we need to
allocate more memory
+ size = newSize;
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
size);
- columnTypes.add(ColumnCategory.FIELD);
- measurementList.add(metadata.getMeasurementUid());
- dataTypeList.add(metadata.getDataType());
- valueChunkList.add(chunk);
- ++offset;
+ } else {
+ break;
}
- break;
} else {
- // Record the column information corresponding to Meta to fill in
Tablet
- columnTypes.add(ColumnCategory.FIELD);
- measurementList.add(metadata.getMeasurementUid());
- dataTypeList.add(metadata.getDataType());
- valueChunkList.add(chunk);
+ size = newSize;
}
+ hasSelectedNonNullChunk = true;
}
+ columnTypes.add(ColumnCategory.FIELD);
+ measurementList.add(schema.getMeasurementName());
+ dataTypeList.add(schema.getType());
+ valueChunkList.add(chunk);
+ hasSelectedField = true;
}
- if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) {
+ if (offset >= fieldSchemaList.size()) {
currentChunkMetadata = null;
}
+ if (!hasSelectedField) {
+ this.chunkReader = null;
+ this.batchData = null;
+ return;
+ }
+
this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
this.modsInfoList =
ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList,
modifications);
}
+ private boolean areAllFieldsDeletedByMods(
+ final IDeviceID currentDeviceID, final AbstractAlignedChunkMetadata
alignedChunkMetadata) {
+ if (modifications.isEmpty() || fieldSchemaList.isEmpty()) {
+ return false;
+ }
+
+ for (final IMeasurementSchema schema : fieldSchemaList) {
+ if (!ModsOperationUtil.isAllDeletedByMods(
+ currentDeviceID,
+ schema.getMeasurementName(),
+ alignedChunkMetadata.getStartTime(),
+ alignedChunkMetadata.getEndTime(),
+ modifications)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isFieldDeletedByMods(
+ final String measurementID, final long startTime, final long endTime) {
+ return !modifications.isEmpty()
+ && ModsOperationUtil.isAllDeletedByMods(
+ deviceID, measurementID, startTime, endTime, modifications);
+ }
+
private boolean fillMeasurementValueColumns(
final BatchData data, final Tablet tablet, final int rowIndex) {
- final TsPrimitiveType[] primitiveTypes = data.getVector();
+ final TsPrimitiveType[] primitiveTypes =
+ Objects.nonNull(data.getVector()) ? data.getVector() : new
TsPrimitiveType[0];
boolean needFillTime = false;
+ boolean hasNonDeletedField = dataTypeList.size() == deviceIdSize;
for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
- final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
- if (primitiveType == null
- || ModsOperationUtil.isDelete(data.currentTime(),
modsInfoList.get(i))) {
+ final TsPrimitiveType primitiveType =
+ i - deviceIdSize < primitiveTypes.length ? primitiveTypes[i -
deviceIdSize] : null;
+ final boolean isDeleted = ModsOperationUtil.isDelete(data.currentTime(),
modsInfoList.get(i));
+ if (!isDeleted) {
+ hasNonDeletedField = true;
+ }
+ if (primitiveType == null || isDeleted) {
switch (dataTypeList.get(i)) {
case TEXT:
case BLOB:
@@ -480,7 +527,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
throw new UnSupportedDataTypeException("UnSupported" +
primitiveType.getDataType());
}
}
- return needFillTime;
+ return needFillTime || hasNonDeletedField;
}
private void fillDeviceIdColumns(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index e1e6d597191..9dd8ad4eb02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -632,6 +632,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata) {
return device2TimeseriesMetadata.values().stream()
.flatMap(List::stream)
+ .filter(timeseriesMetadata ->
!timeseriesMetadata.getMeasurementId().isEmpty())
.mapToLong(t -> t.getStatistics().getCount())
.sum();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index 237f5eea363..9363a54db4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -137,15 +137,8 @@ public class LoadTsFileTableSchemaCache {
}
public void autoCreateAndVerify(final IDeviceID device) throws
LoadAnalyzeException {
- try {
- if (ModificationUtils.isDeviceDeletedByMods(currentModifications,
currentTimeIndex, device)) {
- return;
- }
- } catch (final IllegalPathException e) {
- LOGGER.warn(
- "Failed to check if device {} is deleted by mods. Will see it as not
deleted.",
- device,
- e);
+ if (isDeviceDeletedByMods(device)) {
+ return;
}
try {
@@ -167,6 +160,19 @@ public class LoadTsFileTableSchemaCache {
}
}
+ public boolean isDeviceDeletedByMods(final IDeviceID device) {
+ try {
+ return ModificationUtils.isDeviceDeletedByMods(
+ currentModifications, currentTimeIndex, device);
+ } catch (final IllegalPathException e) {
+ LOGGER.warn(
+ "Failed to check if device {} is deleted by mods. Will see it as not
deleted.",
+ device,
+ e);
+ return false;
+ }
+ }
+
private void addDevice(final IDeviceID device) {
final String tableName = device.getTableName();
long memoryUsageSizeInBytes = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
index bbfd8f1bb30..e8480a93517 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
@@ -166,6 +166,12 @@ public class TsFileSplitter {
isAligned =
((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK);
+ if (isAligned) {
+ pageIndex2Times = new HashMap<>();
+ pageIndex2ChunkData = new HashMap<>();
+ isTimeChunkNeedDecode = true;
+ }
+
IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset -
Byte.BYTES);
// When loading TsFile with Chunk in data zone but no matched ChunkMetadata
// at the end of file, this Chunk needs to be skipped.
@@ -359,9 +365,6 @@ public class TsFileSplitter {
pageIndex2TimesList.add(pageIndex2Times);
pageIndex2ChunkDataList.add(pageIndex2ChunkData);
isTimeChunkNeedDecodeList.add(isTimeChunkNeedDecode);
- pageIndex2Times = new HashMap<>();
- pageIndex2ChunkData = new HashMap<>();
- isTimeChunkNeedDecode = true;
}
private void switchToTimeChunkContextOfCurrentMeasurement(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
new file mode 100644
index 00000000000..d6bdb1e37fe
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze.load;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.Schema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LoadTsFileAnalyzerTest {
+
+ private int dataNodeId;
+
+ @Before
+ public void setUp() {
+ dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
+ }
+
+ @After
+ public void tearDown() {
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
+ }
+
+ @Test
+ public void testAnalyzeSingleTableFileShouldNotCountTimestampInPointCount()
throws Exception {
+ final File tsFile = new File("load-table-mixed-null-device.tsfile");
+ writeTableTsFileWithMixedDevices(tsFile);
+
+ final LoadTsFile statement =
+ new LoadTsFile(null, tsFile.getAbsolutePath(),
Collections.emptyMap()).setDatabase("db");
+ final TrackingLoadTsFileTableSchemaCache schemaCache = new
TrackingLoadTsFileTableSchemaCache();
+ try (final LoadTsFileAnalyzer analyzer =
+ new LoadTsFileAnalyzer(statement, false, new MPPQueryContext(new
QueryId("test")));
+ final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ injectTableSchemaCache(analyzer, schemaCache);
+
+ final Method method =
+ LoadTsFileAnalyzer.class.getDeclaredMethod(
+ "doAnalyzeSingleTableFile",
+ File.class,
+ TsFileSequenceReader.class,
+ TsFileSequenceReaderTimeseriesMetadataIterator.class,
+ java.util.Map.class);
+ method.setAccessible(true);
+
+ final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator =
+ new TsFileSequenceReaderTimeseriesMetadataIterator(reader, false);
+ method.invoke(
+ analyzer, tsFile, reader, timeseriesMetadataIterator,
reader.getTableSchemaMap());
+ } finally {
+ if (tsFile.exists()) {
+ Assert.assertTrue(tsFile.delete());
+ }
+ }
+
+ Assert.assertEquals(1, statement.getResources().size());
+ final TsFileResource resource = statement.getResources().get(0);
+ Assert.assertTrue(containsDevice(resource.getDevices(), "table1", "tagA"));
+ Assert.assertTrue(containsDevice(resource.getDevices(), "table1", "tagB"));
+ Assert.assertEquals(6L, statement.getWritePointCount(0));
+ Assert.assertTrue(schemaCache.containsDevice("table1", "tagA"));
+ Assert.assertTrue(schemaCache.containsDevice("table1", "tagB"));
+ Assert.assertEquals(2, schemaCache.getVerifiedDeviceCount());
+ }
+
+ private void writeTableTsFileWithMixedDevices(final File tsFile) throws
Exception {
+ if (tsFile.exists()) {
+ Assert.assertTrue(tsFile.delete());
+ }
+
+ final List<IMeasurementSchema> tableSchemaList =
+ Arrays.asList(
+ new MeasurementSchema("tag1", TSDataType.STRING),
+ new MeasurementSchema("s1", TSDataType.INT64),
+ new MeasurementSchema("s2", TSDataType.DOUBLE));
+ final List<ColumnCategory> columnCategoryList =
+ Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD,
ColumnCategory.FIELD);
+
+ final Schema schema = new Schema();
+ schema.registerTableSchema(new TableSchema("table1", tableSchemaList,
columnCategoryList));
+ try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) {
+ writer.setSchema(schema);
+
+ writeDevice(writer, tableSchemaList, new String[] {"table1", "tagA"},
false);
+ writeDevice(writer, tableSchemaList, new String[] {"table1", "tagB"},
true);
+
+ writer.endFile();
+ }
+ }
+
+ private void writeDevice(
+ final TsFileIOWriter writer,
+ final List<IMeasurementSchema> tableSchemaList,
+ final String[] deviceSegments,
+ final boolean areAllFieldsNull)
+ throws Exception {
+ writer.startChunkGroup(new StringArrayDeviceID(deviceSegments));
+
+ final AlignedChunkWriterImpl chunkWriter =
+ new AlignedChunkWriterImpl(tableSchemaList.subList(1,
tableSchemaList.size()));
+ for (int i = 0; i < 3; i++) {
+ final long time = 100 + i;
+ chunkWriter.getTimeChunkWriter().write(time);
+ chunkWriter.getValueChunkWriterByIndex(0).write(time, (long) i,
areAllFieldsNull);
+ chunkWriter.getValueChunkWriterByIndex(1).write(time, 0.5 + i,
areAllFieldsNull);
+ }
+ chunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ }
+
+ private void injectTableSchemaCache(
+ final LoadTsFileAnalyzer analyzer, final
TrackingLoadTsFileTableSchemaCache schemaCache)
+ throws Exception {
+ final Field tableSchemaCacheField =
+ LoadTsFileAnalyzer.class.getDeclaredField("tableSchemaCache");
+ tableSchemaCacheField.setAccessible(true);
+ tableSchemaCacheField.set(analyzer, schemaCache);
+ }
+
+ private boolean containsDevice(final Set<IDeviceID> devices, final String...
expectedSegments) {
+ return devices.stream()
+ .anyMatch(device -> Arrays.equals(device.getSegments(),
expectedSegments));
+ }
+
+ private static class TrackingLoadTsFileTableSchemaCache extends
LoadTsFileTableSchemaCache {
+
+ private final Set<List<Object>> verifiedDevices = new HashSet<>();
+
+ private TrackingLoadTsFileTableSchemaCache() throws
LoadRuntimeOutOfMemoryException {
+ super(null, new MPPQueryContext(new QueryId("load_test")), false);
+ }
+
+ @Override
+ public void autoCreateAndVerify(final IDeviceID device) {
+ verifiedDevices.add(Arrays.asList(device.getSegments()));
+ }
+
+ @Override
+ public boolean isDeviceDeletedByMods(final IDeviceID device) {
+ return false;
+ }
+
+ private boolean containsDevice(final String... expectedSegments) {
+ return verifiedDevices.contains(Arrays.asList((Object[])
expectedSegments));
+ }
+
+ private int getVerifiedDeviceCount() {
+ return verifiedDevices.size();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java
new file mode 100644
index 00000000000..6610880567e
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.splitter;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.Schema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class TsFileSplitterTest {
+
+ @Test
+ public void testSplitTableTimeOnlyAlignedChunk() throws Exception {
+ final File sourceTsFile = new File("split-table-time-only-source.tsfile");
+ final File targetTsFile = new File("split-table-time-only-target.tsfile");
+ final IDeviceID deviceID = new StringArrayDeviceID("table1", "tagA");
+
+ try {
+ writeTableTsFileWithTimeOnlyChunk(sourceTsFile, deviceID);
+
+ final List<ChunkData> emittedChunkDataList = new ArrayList<>();
+ final TsFileSplitter splitter =
+ new TsFileSplitter(
+ sourceTsFile,
+ tsFileData -> {
+ if (tsFileData instanceof ChunkData) {
+ emittedChunkDataList.add((ChunkData) tsFileData);
+ }
+ return true;
+ });
+ splitter.splitTsFileByDataPartition();
+
+ if (targetTsFile.exists()) {
+ Assert.assertTrue(targetTsFile.delete());
+ }
+ try (final TsFileIOWriter writer = new TsFileIOWriter(targetTsFile)) {
+ writer.setSchema(createSchema());
+ IDeviceID currentDeviceID = null;
+ for (final ChunkData chunkData : emittedChunkDataList) {
+ if (!Objects.equals(currentDeviceID, chunkData.getDevice())) {
+ if (Objects.nonNull(currentDeviceID)) {
+ writer.endChunkGroup();
+ }
+ writer.startChunkGroup(chunkData.getDevice());
+ currentDeviceID = chunkData.getDevice();
+ }
+
+ writeSerializedChunkDataToWriter(chunkData, writer);
+ }
+ if (Objects.nonNull(currentDeviceID)) {
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+
+ Assert.assertEquals(1, emittedChunkDataList.size());
+ try (final TsFileSequenceReader reader =
+ new TsFileSequenceReader(targetTsFile.getAbsolutePath())) {
+ final List<AbstractAlignedChunkMetadata> chunkMetadataList =
+ reader.getAlignedChunkMetadata(deviceID, false);
+ Assert.assertEquals(1, chunkMetadataList.size());
+ Assert.assertEquals(
+ 2,
chunkMetadataList.get(0).getTimeChunkMetadata().getStatistics().getCount());
+
Assert.assertTrue(chunkMetadataList.get(0).getValueChunkMetadataList().isEmpty());
+ }
+ } finally {
+ if (sourceTsFile.exists()) {
+ Assert.assertTrue(sourceTsFile.delete());
+ }
+ if (targetTsFile.exists()) {
+ Assert.assertTrue(targetTsFile.delete());
+ }
+ }
+ }
+
+ private void writeTableTsFileWithTimeOnlyChunk(final File tsFile, final
IDeviceID deviceID)
+ throws Exception {
+ if (tsFile.exists()) {
+ Assert.assertTrue(tsFile.delete());
+ }
+
+ try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) {
+ writer.setSchema(createSchema());
+ writer.startChunkGroup(deviceID);
+
+ final AlignedChunkWriterImpl chunkWriter =
+ new AlignedChunkWriterImpl(Collections.emptyList());
+ chunkWriter.write(100);
+ chunkWriter.write(101);
+ chunkWriter.writeToFileWriter(writer);
+
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ }
+
+ private Schema createSchema() {
+ final List<IMeasurementSchema> tableSchemaList =
+ Arrays.asList(
+ new MeasurementSchema("tag1", TSDataType.STRING),
+ new MeasurementSchema("s1", TSDataType.INT64));
+ final List<ColumnCategory> columnCategoryList =
+ Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD);
+
+ final Schema schema = new Schema();
+ schema.registerTableSchema(new TableSchema("table1", tableSchemaList,
columnCategoryList));
+ return schema;
+ }
+
+ private void writeSerializedChunkDataToWriter(
+ final ChunkData chunkData, final TsFileIOWriter writer) throws Exception
{
+ final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ try (final DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ chunkData.serialize(dataOutputStream);
+ }
+ ((ChunkData)
+ TsFileData.deserialize(new
ByteArrayInputStream(byteArrayOutputStream.toByteArray())))
+ .writeToFileWriter(writer);
+ }
+}