This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9793441547c Fix that load-tsfile and pipe-parser may skip time-only 
aligned chunks (#17625)
9793441547c is described below

commit 9793441547cfb32d62a906b225b02ab4df0ac149
Author: Jiang Tian <[email protected]>
AuthorDate: Tue May 12 09:37:35 2026 +0800

    Fix that load-tsfile and pipe-parser may skip time-only aligned chunks 
(#17625)
    
    * Fix that load tsfile may skip time-only aligned chunks
    
    * fix LoadTsFileAnalyzer may count timestamp in point count
    
    * Fix tablet parsing
    
    * add test
---
 .../apache/iotdb/db/it/IoTDBLoadLastCacheIT.java   |   1 -
 .../manual/enhanced/IoTDBPipeClusterIT.java        |  33 ++++
 .../relational/it/db/it/IoTDBLoadTsFileIT.java     |  73 ++++++++
 ...ileInsertionEventTableParserTabletIterator.java | 143 ++++++++++-----
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |   1 +
 .../analyze/load/LoadTsFileTableSchemaCache.java   |  24 ++-
 .../load/splitter/TsFileSplitter.java              |   9 +-
 .../plan/analyze/load/LoadTsFileAnalyzerTest.java  | 197 +++++++++++++++++++++
 .../db/storageengine/load/TsFileSplitterTest.java  | 157 ++++++++++++++++
 9 files changed, 577 insertions(+), 61 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
index 755b3aef758..4fd1f6fab91 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
@@ -86,7 +86,6 @@ public class IoTDBLoadLastCacheIT {
         new Object[][] {
           {LastCacheLoadStrategy.CLEAN_ALL},
           {LastCacheLoadStrategy.UPDATE},
-          {LastCacheLoadStrategy.UPDATE_NO_BLOB},
           {LastCacheLoadStrategy.CLEAN_DEVICE}
         });
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 4350d4072bb..ad283d4a02c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
 import 
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -49,7 +50,9 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -1001,4 +1004,34 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
       TableModelUtils.assertData("test", "test", -200, 100, receiverEnv, 
handleFailure);
     }
   }
+
+  @Test
+  public void testHistoryDataWithEmptyField() {
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "CREATE DATABASE iot_table_stream_attr",
+            "USE iot_table_stream_attr",
+            "CREATE TABLE table1 (region STRING TAG, device_id STRING TAG, 
model_id STRING ATTRIBUTE, maintenance STRING ATTRIBUTE COMMENT 'maintenance', 
temperature FLOAT FIELD COMMENT 'temperature', humidity STRING ATTRIBUTE 
COMMENT 'humidity', plant_id STRING TAG) COMMENT 'table1'",
+            String.format(
+                "create pipe test with source ('inclusion'='all') with 
sink('node-urls'='%s')",
+                receiverEnv.getDataNodeWrapper(0).getIpAndPortString()),
+            "select * from table1 order by time",
+            "INSERT INTO table1(region, plant_id, device_id, model_id, 
maintenance, time, temperature, humidity) VALUES ('north', null, 'd101', 'red', 
null, '2025-11-26 13:38:00', 91.0, null), (null, '1003', null, null, 'maint-a', 
'2025-11-26 13:39:00', null, '36.2'), (null, null, null, 'green', 'maint-b', 
'2025-11-26 13:40:00', 88.8, '34.9')",
+            "INSERT INTO table1(region, plant_id, device_id, model_id, 
maintenance, time, temperature, humidity) VALUES ('south', '1005', 'd105', 
null, null, '2025-11-26 13:41:00', 87.5, null)",
+            "INSERT INTO table1(region, plant_id, device_id, model_id, 
maintenance, time, temperature, humidity) VALUES ('west', '1006', 'd106', 
'blue', 'maint-c', '2025-11-26 13:42:00', null, '36.8')"),
+        BaseEnv.TABLE_SQL_DIALECT);
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select * from iot_table_stream_attr.table1 order by time",
+        
"time,region,device_id,model_id,maintenance,temperature,humidity,plant_id,",
+        new HashSet<>(
+            Arrays.asList(
+                "2025-11-26T13:38:00.000Z,north,d101,red,null,91.0,null,null,",
+                
"2025-11-26T13:39:00.000Z,null,null,null,maint-a,null,36.2,1003,",
+                
"2025-11-26T13:40:00.000Z,null,null,green,maint-b,88.8,34.9,null,",
+                
"2025-11-26T13:41:00.000Z,south,d105,null,null,87.5,null,1005,",
+                
"2025-11-26T13:42:00.000Z,west,d106,blue,maint-c,null,36.8,1006,")),
+        (String) null);
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
index a58e0633b74..b351ace5a4d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
@@ -28,7 +28,10 @@ import org.apache.iotdb.itbase.env.BaseEnv;
 
 import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
@@ -486,6 +489,76 @@ public class IoTDBLoadTsFileIT {
     return tableCreation;
   }
 
+  @Test
+  public void testLoadWithAllFieldsNullRows() throws Exception {
+    final List<IMeasurementSchema> schemas =
+        Arrays.asList(
+            new MeasurementSchema("f1", TSDataType.INT32),
+            new MeasurementSchema("f2", TSDataType.INT64));
+    final List<ColumnCategory> columnCategories =
+        Arrays.asList(ColumnCategory.FIELD, ColumnCategory.FIELD);
+
+    final File file = new File(tmpDir, "1-0-0-0.tsfile");
+    final int totalRows = 20;
+
+    try (final TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+      tsFileWriter.registerTableSchema(
+          new TableSchema(SchemaConfig.TABLE_0, schemas, columnCategories));
+
+      final List<String> columnNames = Arrays.asList("f1", "f2");
+      final List<TSDataType> dataTypes = Arrays.asList(TSDataType.INT32, 
TSDataType.INT64);
+      final Tablet tablet =
+          new Tablet(SchemaConfig.TABLE_0, columnNames, dataTypes, 
columnCategories);
+
+      for (int r = 0; r < totalRows; r++) {
+        final int row = tablet.getRowSize();
+        tablet.addTimestamp(row, (r + 1) * 1000L);
+      }
+      tsFileWriter.writeTable(tablet);
+    }
+
+    try (final Connection connection =
+            EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(String.format("create database if not exists %s", 
SchemaConfig.DATABASE_0));
+      statement.execute(String.format("use %s", SchemaConfig.DATABASE_0));
+      statement.execute(
+          String.format(
+              "load '%s' with ('database'='%s')", file.getAbsolutePath(), 
SchemaConfig.DATABASE_0));
+
+      try (final ResultSet resultSet =
+          statement.executeQuery(String.format("select count(*) from %s", 
SchemaConfig.TABLE_0))) {
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals(totalRows, resultSet.getLong(1));
+      }
+
+      try (final ResultSet resultSet =
+          statement.executeQuery(
+              String.format("select count(f1), count(f2) from %s", 
SchemaConfig.TABLE_0))) {
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals(0, resultSet.getLong(1));
+        Assert.assertEquals(0, resultSet.getLong(2));
+      }
+
+      try (final ResultSet resultSet =
+          statement.executeQuery(
+              String.format("select time, f1, f2 from %s order by time", 
SchemaConfig.TABLE_0))) {
+        int count = 0;
+        while (resultSet.next()) {
+          final long time = resultSet.getLong("time");
+          final int expectedTime = (count + 1) * 1000;
+          Assert.assertEquals(expectedTime, time);
+          resultSet.getInt("f1");
+          Assert.assertTrue(resultSet.wasNull());
+          resultSet.getLong("f2");
+          Assert.assertTrue(resultSet.wasNull());
+          count++;
+        }
+        Assert.assertEquals(totalRows, count);
+      }
+    }
+  }
+
   private static class SchemaConfig {
     private static final String DATABASE_0 = "root";
     private static final String TABLE_0 = "test";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index f05cf872c79..9d11d51d31a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -101,6 +101,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
   private List<ColumnCategory> columnTypes;
   private List<String> measurementList;
   private List<TSDataType> dataTypeList;
+  private List<IMeasurementSchema> fieldSchemaList;
   private int deviceIdSize;
 
   private List<ModsOperationUtil.ModsInfo> modsInfoList;
@@ -194,7 +195,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
 
               long size = 0;
               List<AbstractAlignedChunkMetadata> iChunkMetadataList =
-                  reader.getAlignedChunkMetadata(pair.left, true);
+                  reader.getAlignedChunkMetadata(pair.left, false);
 
               Iterator<AbstractAlignedChunkMetadata> chunkMetadataIterator =
                   iChunkMetadataList.iterator();
@@ -213,27 +214,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
                   continue;
                 }
 
-                Iterator<IChunkMetadata> iChunkMetadataIterator =
-                    
alignedChunkMetadata.getValueChunkMetadataList().iterator();
-                while (iChunkMetadataIterator.hasNext()) {
-                  IChunkMetadata iChunkMetadata = 
iChunkMetadataIterator.next();
-                  if (iChunkMetadata == null) {
-                    iChunkMetadataIterator.remove();
-                    continue;
-                  }
-
-                  if (!modifications.isEmpty()
-                      && ModsOperationUtil.isAllDeletedByMods(
-                          pair.getLeft(),
-                          iChunkMetadata.getMeasurementUid(),
-                          alignedChunkMetadata.getStartTime(),
-                          alignedChunkMetadata.getEndTime(),
-                          modifications)) {
-                    iChunkMetadataIterator.remove();
-                  }
-                }
-
-                if 
(alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) {
+                if (areAllFieldsDeletedByMods(pair.getLeft(), 
alignedChunkMetadata)) {
                   chunkMetadataIterator.remove();
                   continue;
                 }
@@ -267,6 +248,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
               dataTypeList = new ArrayList<>();
               columnTypes = new ArrayList<>();
               measurementList = new ArrayList<>();
+              fieldSchemaList = new ArrayList<>();
 
               for (int i = 0; i < columnSchemaSize; i++) {
                 final IMeasurementSchema schema = 
tableSchema.getColumnSchemas().get(i);
@@ -280,6 +262,9 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
                     measurementList.add(measurementName);
                     dataTypeList.add(schema.getType());
                   }
+                  if (ColumnCategory.FIELD.equals(columnCategory)) {
+                    fieldSchemaList.add(schema);
+                  }
                 }
               }
               deviceIdSize = dataTypeList.size();
@@ -331,9 +316,9 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
           tablet =
               new Tablet(
                   tableName,
-                  measurementList,
-                  dataTypeList,
-                  columnTypes,
+                  new ArrayList<>(measurementList),
+                  new ArrayList<>(dataTypeList),
+                  new ArrayList<>(columnTypes),
                   rowCountAndMemorySize.getLeft());
           tablet.initBitMaps();
           isFirstRow = false;
@@ -376,6 +361,20 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
     long size = timeChunkSize;
 
     final List<Chunk> valueChunkList = new ArrayList<>();
+    final Map<String, IChunkMetadata> valueChunkMetadataMap =
+        alignedChunkMetadata.getValueChunkMetadataList().stream()
+            .filter(Objects::nonNull)
+            .filter(
+                metadata ->
+                    !isFieldDeletedByMods(
+                        metadata.getMeasurementUid(),
+                        alignedChunkMetadata.getStartTime(),
+                        alignedChunkMetadata.getEndTime()))
+            .collect(
+                Collectors.toMap(
+                    IChunkMetadata::getMeasurementUid,
+                    metadata -> metadata,
+                    (left, right) -> left));
 
     // To ensure that the Tablet has the same alignedChunk column as the 
current one,
     // you need to create a new Tablet to fill in the data.
@@ -392,50 +391,98 @@ public class 
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
     measurementList.subList(deviceIdSize, measurementList.size()).clear();
     dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear();
 
-    for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size(); 
++offset) {
-      final IChunkMetadata metadata = 
alignedChunkMetadata.getValueChunkMetadataList().get(offset);
+    boolean hasSelectedField = fieldSchemaList.isEmpty();
+    boolean hasSelectedNonNullChunk = false;
+    for (; offset < fieldSchemaList.size(); ++offset) {
+      final IMeasurementSchema schema = fieldSchemaList.get(offset);
+      if (isFieldDeletedByMods(
+          schema.getMeasurementName(),
+          alignedChunkMetadata.getStartTime(),
+          alignedChunkMetadata.getEndTime())) {
+        continue;
+      }
+
+      final IChunkMetadata metadata = 
valueChunkMetadataMap.get(schema.getMeasurementName());
+      Chunk chunk = null;
       if (metadata != null) {
-        final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
-        size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
-        if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-          if (valueChunkList.isEmpty()) {
+        chunk = reader.readMemChunk((ChunkMetadata) metadata);
+        final long newSize = size + 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
+        if (newSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+          if (!hasSelectedNonNullChunk) {
             // If the first chunk exceeds the memory limit, we need to 
allocate more memory
+            size = newSize;
             
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
size);
-            columnTypes.add(ColumnCategory.FIELD);
-            measurementList.add(metadata.getMeasurementUid());
-            dataTypeList.add(metadata.getDataType());
-            valueChunkList.add(chunk);
-            ++offset;
+          } else {
+            break;
           }
-          break;
         } else {
-          // Record the column information corresponding to Meta to fill in 
Tablet
-          columnTypes.add(ColumnCategory.FIELD);
-          measurementList.add(metadata.getMeasurementUid());
-          dataTypeList.add(metadata.getDataType());
-          valueChunkList.add(chunk);
+          size = newSize;
         }
+        hasSelectedNonNullChunk = true;
       }
+      columnTypes.add(ColumnCategory.FIELD);
+      measurementList.add(schema.getMeasurementName());
+      dataTypeList.add(schema.getType());
+      valueChunkList.add(chunk);
+      hasSelectedField = true;
     }
 
-    if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) {
+    if (offset >= fieldSchemaList.size()) {
       currentChunkMetadata = null;
     }
 
+    if (!hasSelectedField) {
+      this.chunkReader = null;
+      this.batchData = null;
+      return;
+    }
+
     this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
     this.modsInfoList =
         ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList, 
modifications);
   }
 
+  private boolean areAllFieldsDeletedByMods(
+      final IDeviceID currentDeviceID, final AbstractAlignedChunkMetadata 
alignedChunkMetadata) {
+    if (modifications.isEmpty() || fieldSchemaList.isEmpty()) {
+      return false;
+    }
+
+    for (final IMeasurementSchema schema : fieldSchemaList) {
+      if (!ModsOperationUtil.isAllDeletedByMods(
+          currentDeviceID,
+          schema.getMeasurementName(),
+          alignedChunkMetadata.getStartTime(),
+          alignedChunkMetadata.getEndTime(),
+          modifications)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean isFieldDeletedByMods(
+      final String measurementID, final long startTime, final long endTime) {
+    return !modifications.isEmpty()
+        && ModsOperationUtil.isAllDeletedByMods(
+            deviceID, measurementID, startTime, endTime, modifications);
+  }
+
   private boolean fillMeasurementValueColumns(
       final BatchData data, final Tablet tablet, final int rowIndex) {
-    final TsPrimitiveType[] primitiveTypes = data.getVector();
+    final TsPrimitiveType[] primitiveTypes =
+        Objects.nonNull(data.getVector()) ? data.getVector() : new 
TsPrimitiveType[0];
     boolean needFillTime = false;
+    boolean hasNonDeletedField = dataTypeList.size() == deviceIdSize;
 
     for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
-      final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
-      if (primitiveType == null
-          || ModsOperationUtil.isDelete(data.currentTime(), 
modsInfoList.get(i))) {
+      final TsPrimitiveType primitiveType =
+          i - deviceIdSize < primitiveTypes.length ? primitiveTypes[i - 
deviceIdSize] : null;
+      final boolean isDeleted = ModsOperationUtil.isDelete(data.currentTime(), 
modsInfoList.get(i));
+      if (!isDeleted) {
+        hasNonDeletedField = true;
+      }
+      if (primitiveType == null || isDeleted) {
         switch (dataTypeList.get(i)) {
           case TEXT:
           case BLOB:
@@ -480,7 +527,7 @@ public class TsFileInsertionEventTableParserTabletIterator 
implements Iterator<T
           throw new UnSupportedDataTypeException("UnSupported" + 
primitiveType.getDataType());
       }
     }
-    return needFillTime;
+    return needFillTime || hasNonDeletedField;
   }
 
   private void fillDeviceIdColumns(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index e1e6d597191..9dd8ad4eb02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -632,6 +632,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata) {
     return device2TimeseriesMetadata.values().stream()
         .flatMap(List::stream)
+        .filter(timeseriesMetadata -> 
!timeseriesMetadata.getMeasurementId().isEmpty())
         .mapToLong(t -> t.getStatistics().getCount())
         .sum();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index 237f5eea363..9363a54db4b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -137,15 +137,8 @@ public class LoadTsFileTableSchemaCache {
   }
 
   public void autoCreateAndVerify(final IDeviceID device) throws 
LoadAnalyzeException {
-    try {
-      if (ModificationUtils.isDeviceDeletedByMods(currentModifications, 
currentTimeIndex, device)) {
-        return;
-      }
-    } catch (final IllegalPathException e) {
-      LOGGER.warn(
-          "Failed to check if device {} is deleted by mods. Will see it as not 
deleted.",
-          device,
-          e);
+    if (isDeviceDeletedByMods(device)) {
+      return;
     }
 
     try {
@@ -167,6 +160,19 @@ public class LoadTsFileTableSchemaCache {
     }
   }
 
+  public boolean isDeviceDeletedByMods(final IDeviceID device) {
+    try {
+      return ModificationUtils.isDeviceDeletedByMods(
+          currentModifications, currentTimeIndex, device);
+    } catch (final IllegalPathException e) {
+      LOGGER.warn(
+          "Failed to check if device {} is deleted by mods. Will see it as not 
deleted.",
+          device,
+          e);
+      return false;
+    }
+  }
+
   private void addDevice(final IDeviceID device) {
     final String tableName = device.getTableName();
     long memoryUsageSizeInBytes = 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
index bbfd8f1bb30..e8480a93517 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
@@ -166,6 +166,12 @@ public class TsFileSplitter {
     isAligned =
         ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
             == TsFileConstant.TIME_COLUMN_MASK);
+    if (isAligned) {
+      pageIndex2Times = new HashMap<>();
+      pageIndex2ChunkData = new HashMap<>();
+      isTimeChunkNeedDecode = true;
+    }
+
     IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - 
Byte.BYTES);
     // When loading TsFile with Chunk in data zone but no matched ChunkMetadata
     // at the end of file, this Chunk needs to be skipped.
@@ -359,9 +365,6 @@ public class TsFileSplitter {
     pageIndex2TimesList.add(pageIndex2Times);
     pageIndex2ChunkDataList.add(pageIndex2ChunkData);
     isTimeChunkNeedDecodeList.add(isTimeChunkNeedDecode);
-    pageIndex2Times = new HashMap<>();
-    pageIndex2ChunkData = new HashMap<>();
-    isTimeChunkNeedDecode = true;
   }
 
   private void switchToTimeChunkContextOfCurrentMeasurement(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
new file mode 100644
index 00000000000..d6bdb1e37fe
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze.load;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.Schema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LoadTsFileAnalyzerTest {
+
+  private int dataNodeId;
+
+  @Before
+  public void setUp() {
+    dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
+  }
+
+  @After
+  public void tearDown() {
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
+  }
+
+  @Test
+  public void testAnalyzeSingleTableFileShouldNotCountTimestampInPointCount() 
throws Exception {
+    final File tsFile = new File("load-table-mixed-null-device.tsfile");
+    writeTableTsFileWithMixedDevices(tsFile);
+
+    final LoadTsFile statement =
+        new LoadTsFile(null, tsFile.getAbsolutePath(), 
Collections.emptyMap()).setDatabase("db");
+    final TrackingLoadTsFileTableSchemaCache schemaCache = new 
TrackingLoadTsFileTableSchemaCache();
+    try (final LoadTsFileAnalyzer analyzer =
+            new LoadTsFileAnalyzer(statement, false, new MPPQueryContext(new 
QueryId("test")));
+        final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+      injectTableSchemaCache(analyzer, schemaCache);
+
+      final Method method =
+          LoadTsFileAnalyzer.class.getDeclaredMethod(
+              "doAnalyzeSingleTableFile",
+              File.class,
+              TsFileSequenceReader.class,
+              TsFileSequenceReaderTimeseriesMetadataIterator.class,
+              java.util.Map.class);
+      method.setAccessible(true);
+
+      final TsFileSequenceReaderTimeseriesMetadataIterator 
timeseriesMetadataIterator =
+          new TsFileSequenceReaderTimeseriesMetadataIterator(reader, false);
+      method.invoke(
+          analyzer, tsFile, reader, timeseriesMetadataIterator, 
reader.getTableSchemaMap());
+    } finally {
+      if (tsFile.exists()) {
+        Assert.assertTrue(tsFile.delete());
+      }
+    }
+
+    Assert.assertEquals(1, statement.getResources().size());
+    final TsFileResource resource = statement.getResources().get(0);
+    Assert.assertTrue(containsDevice(resource.getDevices(), "table1", "tagA"));
+    Assert.assertTrue(containsDevice(resource.getDevices(), "table1", "tagB"));
+    Assert.assertEquals(6L, statement.getWritePointCount(0));
+    Assert.assertTrue(schemaCache.containsDevice("table1", "tagA"));
+    Assert.assertTrue(schemaCache.containsDevice("table1", "tagB"));
+    Assert.assertEquals(2, schemaCache.getVerifiedDeviceCount());
+  }
+
+  private void writeTableTsFileWithMixedDevices(final File tsFile) throws 
Exception {
+    if (tsFile.exists()) {
+      Assert.assertTrue(tsFile.delete());
+    }
+
+    final List<IMeasurementSchema> tableSchemaList =
+        Arrays.asList(
+            new MeasurementSchema("tag1", TSDataType.STRING),
+            new MeasurementSchema("s1", TSDataType.INT64),
+            new MeasurementSchema("s2", TSDataType.DOUBLE));
+    final List<ColumnCategory> columnCategoryList =
+        Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, 
ColumnCategory.FIELD);
+
+    final Schema schema = new Schema();
+    schema.registerTableSchema(new TableSchema("table1", tableSchemaList, 
columnCategoryList));
+    try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) {
+      writer.setSchema(schema);
+
+      writeDevice(writer, tableSchemaList, new String[] {"table1", "tagA"}, 
false);
+      writeDevice(writer, tableSchemaList, new String[] {"table1", "tagB"}, 
true);
+
+      writer.endFile();
+    }
+  }
+
+  private void writeDevice(
+      final TsFileIOWriter writer,
+      final List<IMeasurementSchema> tableSchemaList,
+      final String[] deviceSegments,
+      final boolean areAllFieldsNull)
+      throws Exception {
+    writer.startChunkGroup(new StringArrayDeviceID(deviceSegments));
+
+    final AlignedChunkWriterImpl chunkWriter =
+        new AlignedChunkWriterImpl(tableSchemaList.subList(1, 
tableSchemaList.size()));
+    for (int i = 0; i < 3; i++) {
+      final long time = 100 + i;
+      chunkWriter.getTimeChunkWriter().write(time);
+      chunkWriter.getValueChunkWriterByIndex(0).write(time, (long) i, 
areAllFieldsNull);
+      chunkWriter.getValueChunkWriterByIndex(1).write(time, 0.5 + i, 
areAllFieldsNull);
+    }
+    chunkWriter.writeToFileWriter(writer);
+    writer.endChunkGroup();
+  }
+
+  private void injectTableSchemaCache(
+      final LoadTsFileAnalyzer analyzer, final 
TrackingLoadTsFileTableSchemaCache schemaCache)
+      throws Exception {
+    final Field tableSchemaCacheField =
+        LoadTsFileAnalyzer.class.getDeclaredField("tableSchemaCache");
+    tableSchemaCacheField.setAccessible(true);
+    tableSchemaCacheField.set(analyzer, schemaCache);
+  }
+
+  private boolean containsDevice(final Set<IDeviceID> devices, final String... 
expectedSegments) {
+    return devices.stream()
+        .anyMatch(device -> Arrays.equals(device.getSegments(), 
expectedSegments));
+  }
+
+  private static class TrackingLoadTsFileTableSchemaCache extends 
LoadTsFileTableSchemaCache {
+
+    private final Set<List<Object>> verifiedDevices = new HashSet<>();
+
+    private TrackingLoadTsFileTableSchemaCache() throws 
LoadRuntimeOutOfMemoryException {
+      super(null, new MPPQueryContext(new QueryId("load_test")), false);
+    }
+
+    @Override
+    public void autoCreateAndVerify(final IDeviceID device) {
+      verifiedDevices.add(Arrays.asList(device.getSegments()));
+    }
+
+    @Override
+    public boolean isDeviceDeletedByMods(final IDeviceID device) {
+      return false;
+    }
+
+    private boolean containsDevice(final String... expectedSegments) {
+      return verifiedDevices.contains(Arrays.asList((Object[]) 
expectedSegments));
+    }
+
+    private int getVerifiedDeviceCount() {
+      return verifiedDevices.size();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java
new file mode 100644
index 00000000000..6610880567e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.splitter;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.Schema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class TsFileSplitterTest {
+
+  @Test
+  public void testSplitTableTimeOnlyAlignedChunk() throws Exception {
+    final File sourceTsFile = new File("split-table-time-only-source.tsfile");
+    final File targetTsFile = new File("split-table-time-only-target.tsfile");
+    final IDeviceID deviceID = new StringArrayDeviceID("table1", "tagA");
+
+    try {
+      writeTableTsFileWithTimeOnlyChunk(sourceTsFile, deviceID);
+
+      final List<ChunkData> emittedChunkDataList = new ArrayList<>();
+      final TsFileSplitter splitter =
+          new TsFileSplitter(
+              sourceTsFile,
+              tsFileData -> {
+                if (tsFileData instanceof ChunkData) {
+                  emittedChunkDataList.add((ChunkData) tsFileData);
+                }
+                return true;
+              });
+      splitter.splitTsFileByDataPartition();
+
+      if (targetTsFile.exists()) {
+        Assert.assertTrue(targetTsFile.delete());
+      }
+      try (final TsFileIOWriter writer = new TsFileIOWriter(targetTsFile)) {
+        writer.setSchema(createSchema());
+        IDeviceID currentDeviceID = null;
+        for (final ChunkData chunkData : emittedChunkDataList) {
+          if (!Objects.equals(currentDeviceID, chunkData.getDevice())) {
+            if (Objects.nonNull(currentDeviceID)) {
+              writer.endChunkGroup();
+            }
+            writer.startChunkGroup(chunkData.getDevice());
+            currentDeviceID = chunkData.getDevice();
+          }
+
+          writeSerializedChunkDataToWriter(chunkData, writer);
+        }
+        if (Objects.nonNull(currentDeviceID)) {
+          writer.endChunkGroup();
+        }
+        writer.endFile();
+      }
+
+      Assert.assertEquals(1, emittedChunkDataList.size());
+      try (final TsFileSequenceReader reader =
+          new TsFileSequenceReader(targetTsFile.getAbsolutePath())) {
+        final List<AbstractAlignedChunkMetadata> chunkMetadataList =
+            reader.getAlignedChunkMetadata(deviceID, false);
+        Assert.assertEquals(1, chunkMetadataList.size());
+        Assert.assertEquals(
+            2, 
chunkMetadataList.get(0).getTimeChunkMetadata().getStatistics().getCount());
+        
Assert.assertTrue(chunkMetadataList.get(0).getValueChunkMetadataList().isEmpty());
+      }
+    } finally {
+      if (sourceTsFile.exists()) {
+        Assert.assertTrue(sourceTsFile.delete());
+      }
+      if (targetTsFile.exists()) {
+        Assert.assertTrue(targetTsFile.delete());
+      }
+    }
+  }
+
+  private void writeTableTsFileWithTimeOnlyChunk(final File tsFile, final 
IDeviceID deviceID)
+      throws Exception {
+    if (tsFile.exists()) {
+      Assert.assertTrue(tsFile.delete());
+    }
+
+    try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) {
+      writer.setSchema(createSchema());
+      writer.startChunkGroup(deviceID);
+
+      final AlignedChunkWriterImpl chunkWriter =
+          new AlignedChunkWriterImpl(Collections.emptyList());
+      chunkWriter.write(100);
+      chunkWriter.write(101);
+      chunkWriter.writeToFileWriter(writer);
+
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+  }
+
+  private Schema createSchema() {
+    final List<IMeasurementSchema> tableSchemaList =
+        Arrays.asList(
+            new MeasurementSchema("tag1", TSDataType.STRING),
+            new MeasurementSchema("s1", TSDataType.INT64));
+    final List<ColumnCategory> columnCategoryList =
+        Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD);
+
+    final Schema schema = new Schema();
+    schema.registerTableSchema(new TableSchema("table1", tableSchemaList, 
columnCategoryList));
+    return schema;
+  }
+
+  private void writeSerializedChunkDataToWriter(
+      final ChunkData chunkData, final TsFileIOWriter writer) throws Exception 
{
+    final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+    try (final DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      chunkData.serialize(dataOutputStream);
+    }
+    ((ChunkData)
+            TsFileData.deserialize(new 
ByteArrayInputStream(byteArrayOutputStream.toByteArray())))
+        .writeToFileWriter(writer);
+  }
+}


Reply via email to