This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 18b10bc2d3b Pipe: Fix the inconsistency between schema and values 
columns in the process of building tsfile (#15625)
18b10bc2d3b is described below

commit 18b10bc2d3b06603f58da807d17e34fc36b8081b
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jun 16 16:21:08 2025 +0800

    Pipe: Fix the inconsistency between schema and values columns in the 
process of building tsfile (#15625)
---
 .../util/builder/PipeTableModelTsFileBuilder.java  |  1 +
 .../builder/PipeTableModelTsFileBuilderV2.java     | 53 ++++++++++++++--------
 .../util/builder/PipeTreeModelTsFileBuilder.java   |  1 +
 .../util/builder/PipeTreeModelTsFileBuilderV2.java | 42 +++++++++++------
 4 files changed, 64 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
index 0766ce58e83..7f4d2db501c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
@@ -256,6 +256,7 @@ public class PipeTableModelTsFileBuilder extends 
PipeTsFileBuilder {
     final Set<IMeasurementSchema> seen = new HashSet<>();
     final List<Integer> distinctIndices =
         IntStream.range(0, aggregatedSchemas.size())
+            .filter(i -> Objects.nonNull(aggregatedSchemas.get(i)))
             .filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the 
first occurrence index
             .boxed()
             .collect(Collectors.toList());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
index 7e6886deebb..46b63f4e0f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
@@ -31,6 +31,7 @@ import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.DateUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
@@ -164,17 +165,16 @@ public class PipeTableModelTsFileBuilderV2 extends 
PipeTsFileBuilder {
       List<IMeasurementSchema> aggregatedSchemas =
           tablets.stream()
               .flatMap(tablet -> tablet.getSchemas().stream())
-              .filter(Objects::nonNull)
               .collect(Collectors.toList());
       List<ColumnCategory> aggregatedColumnCategories =
           tablets.stream()
               .flatMap(tablet -> tablet.getColumnTypes().stream())
-              .filter(Objects::nonNull)
               .collect(Collectors.toList());
 
       final Set<IMeasurementSchema> seen = new HashSet<>();
       final List<Integer> distinctIndices =
           IntStream.range(0, aggregatedSchemas.size())
+              .filter(i -> Objects.nonNull(aggregatedSchemas.get(i)))
               .filter(
                   i -> seen.add(aggregatedSchemas.get(i))) // Only keep the 
first occurrence index
               .boxed()
@@ -195,15 +195,23 @@ public class PipeTableModelTsFileBuilderV2 extends 
PipeTsFileBuilder {
 
     for (int i = 0, size = tabletList.size(); i < size; ++i) {
       final Tablet tablet = tabletList.get(i);
+      MeasurementSchema[] measurementSchemas =
+          tablet.getSchemas().stream()
+              .map(schema -> (MeasurementSchema) schema)
+              .toArray(MeasurementSchema[]::new);
+      Object[] values = Arrays.copyOf(tablet.getValues(), 
tablet.getValues().length);
+      BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(), 
tablet.getBitMaps().length);
+      ColumnCategory[] columnCategory = tablet.getColumnTypes().toArray(new 
ColumnCategory[0]);
 
       // convert date value to int refer to
       // 
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
-      final Object[] values = Arrays.copyOf(tablet.getValues(), 
tablet.getValues().length);
+      int validatedIndex = 0;
       for (int j = 0; j < tablet.getSchemas().size(); ++j) {
-        final IMeasurementSchema schema = tablet.getSchemas().get(j);
-        if (Objects.nonNull(schema)
-            && Objects.equals(TSDataType.DATE, schema.getType())
-            && values[j] instanceof LocalDate[]) {
+        final MeasurementSchema schema = measurementSchemas[j];
+        if (Objects.isNull(schema) || Objects.isNull(columnCategory[j])) {
+          continue;
+        }
+        if (Objects.equals(TSDataType.DATE, schema.getType()) && values[j] 
instanceof LocalDate[]) {
           final LocalDate[] dates = ((LocalDate[]) values[j]);
           final int[] dateValues = new int[dates.length];
           for (int k = 0; k < Math.min(dates.length, tablet.getRowSize()); 
k++) {
@@ -213,6 +221,18 @@ public class PipeTableModelTsFileBuilderV2 extends 
PipeTsFileBuilder {
           }
           values[j] = dateValues;
         }
+        measurementSchemas[validatedIndex] = schema;
+        values[validatedIndex] = values[j];
+        bitMaps[validatedIndex] = bitMaps[j];
+        columnCategory[validatedIndex] = columnCategory[j];
+        validatedIndex++;
+      }
+
+      if (validatedIndex != measurementSchemas.length) {
+        values = Arrays.copyOf(values, validatedIndex);
+        measurementSchemas = Arrays.copyOf(measurementSchemas, validatedIndex);
+        bitMaps = Arrays.copyOf(bitMaps, validatedIndex);
+        columnCategory = Arrays.copyOf(columnCategory, validatedIndex);
       }
 
       final RelationalInsertTabletNode insertTabletNode =
@@ -221,24 +241,19 @@ public class PipeTableModelTsFileBuilderV2 extends 
PipeTsFileBuilder {
               new PartialPath(tablet.getTableName()),
               // the data of the table model is aligned
               true,
-              tablet.getSchemas().stream()
-                  .filter(Objects::nonNull)
-                  .map(IMeasurementSchema::getMeasurementName)
+              Arrays.stream(measurementSchemas)
+                  .map(MeasurementSchema::getMeasurementName)
                   .toArray(String[]::new),
-              tablet.getSchemas().stream()
-                  .map(IMeasurementSchema::getType)
+              Arrays.stream(measurementSchemas)
+                  .map(MeasurementSchema::getType)
                   .toArray(TSDataType[]::new),
               // TODO: cast
-              tablet.getSchemas().stream()
-                  .filter(Objects::nonNull)
-                  .map(schema -> (MeasurementSchema) schema)
-                  .toArray(MeasurementSchema[]::new),
+              measurementSchemas,
               tablet.getTimestamps(),
-              tablet.getBitMaps(),
+              bitMaps,
               values,
               tablet.getRowSize(),
-              tablet.getColumnTypes().stream()
-                  .filter(Objects::nonNull)
+              Arrays.stream(columnCategory)
                   .map(TsTableColumnCategory::fromTsFileColumnCategory)
                   .toArray(TsTableColumnCategory[]::new));
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
index 237e294df69..824bc35eb4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
@@ -242,6 +242,7 @@ public class PipeTreeModelTsFileBuilder extends 
PipeTsFileBuilder {
     final Set<IMeasurementSchema> seen = new HashSet<>();
     final List<Integer> distinctIndices =
         IntStream.range(0, aggregatedSchemas.size())
+            .filter(i -> Objects.nonNull(aggregatedSchemas.get(i)))
             .filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the 
first occurrence index
             .boxed()
             .collect(Collectors.toList());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
index 9e8855a53b8..7f43d57de45 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.DateUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
@@ -138,15 +139,23 @@ public class PipeTreeModelTsFileBuilderV2 extends 
PipeTsFileBuilder {
       final IMemTable memTable, final RestorableTsFileIOWriter writer) throws 
Exception {
     for (int i = 0, size = tabletList.size(); i < size; ++i) {
       final Tablet tablet = tabletList.get(i);
+      MeasurementSchema[] measurementSchemas =
+          tablet.getSchemas().stream()
+              .map(schema -> (MeasurementSchema) schema)
+              .toArray(MeasurementSchema[]::new);
+      Object[] values = Arrays.copyOf(tablet.getValues(), 
tablet.getValues().length);
+      BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(), 
tablet.getBitMaps().length);
 
       // convert date value to int refer to
       // 
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
-      final Object[] values = Arrays.copyOf(tablet.getValues(), 
tablet.getValues().length);
+      int validatedIndex = 0;
       for (int j = 0; j < tablet.getSchemas().size(); ++j) {
-        final IMeasurementSchema schema = tablet.getSchemas().get(j);
-        if (Objects.nonNull(schema)
-            && Objects.equals(TSDataType.DATE, schema.getType())
-            && values[j] instanceof LocalDate[]) {
+        final IMeasurementSchema schema = measurementSchemas[j];
+        if (Objects.isNull(schema)) {
+          break;
+        }
+
+        if (Objects.equals(TSDataType.DATE, schema.getType()) && values[j] 
instanceof LocalDate[]) {
           final LocalDate[] dates = ((LocalDate[]) values[j]);
           final int[] dateValues = new int[dates.length];
           for (int k = 0; k < Math.min(dates.length, tablet.getRowSize()); 
k++) {
@@ -154,6 +163,16 @@ public class PipeTreeModelTsFileBuilderV2 extends 
PipeTsFileBuilder {
           }
           values[j] = dateValues;
         }
+        measurementSchemas[validatedIndex] = measurementSchemas[j];
+        values[validatedIndex] = values[j];
+        bitMaps[validatedIndex] = bitMaps[j];
+        validatedIndex++;
+      }
+
+      if (validatedIndex != measurementSchemas.length) {
+        values = Arrays.copyOf(values, validatedIndex);
+        measurementSchemas = Arrays.copyOf(measurementSchemas, validatedIndex);
+        bitMaps = Arrays.copyOf(bitMaps, validatedIndex);
       }
 
       final InsertTabletNode insertTabletNode =
@@ -161,21 +180,16 @@ public class PipeTreeModelTsFileBuilderV2 extends 
PipeTsFileBuilder {
               PLACEHOLDER_PLAN_NODE_ID,
               new PartialPath(tablet.getDeviceId()),
               isTabletAlignedList.get(i),
-              tablet.getSchemas().stream()
-                  .filter(Objects::nonNull)
+              Arrays.stream(measurementSchemas)
                   .map(IMeasurementSchema::getMeasurementName)
                   .toArray(String[]::new),
-              tablet.getSchemas().stream()
-                  .filter(Objects::nonNull)
+              Arrays.stream(measurementSchemas)
                   .map(IMeasurementSchema::getType)
                   .toArray(TSDataType[]::new),
               // TODO: cast
-              tablet.getSchemas().stream()
-                  .filter(Objects::nonNull)
-                  .map(schema -> (MeasurementSchema) schema)
-                  .toArray(MeasurementSchema[]::new),
+              measurementSchemas,
               tablet.getTimestamps(),
-              tablet.getBitMaps(),
+              bitMaps,
               values,
               tablet.getRowSize());
 

Reply via email to