This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 18b10bc2d3b Pipe: Fix the inconsistency between schema and values
columns in the process of building tsfile (#15625)
18b10bc2d3b is described below
commit 18b10bc2d3b06603f58da807d17e34fc36b8081b
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jun 16 16:21:08 2025 +0800
Pipe: Fix the inconsistency between schema and values columns in the
process of building tsfile (#15625)
---
.../util/builder/PipeTableModelTsFileBuilder.java | 1 +
.../builder/PipeTableModelTsFileBuilderV2.java | 53 ++++++++++++++--------
.../util/builder/PipeTreeModelTsFileBuilder.java | 1 +
.../util/builder/PipeTreeModelTsFileBuilderV2.java | 42 +++++++++++------
4 files changed, 64 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
index 0766ce58e83..7f4d2db501c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
@@ -256,6 +256,7 @@ public class PipeTableModelTsFileBuilder extends
PipeTsFileBuilder {
final Set<IMeasurementSchema> seen = new HashSet<>();
final List<Integer> distinctIndices =
IntStream.range(0, aggregatedSchemas.size())
+ .filter(i -> Objects.nonNull(aggregatedSchemas.get(i)))
.filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the
first occurrence index
.boxed()
.collect(Collectors.toList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
index 7e6886deebb..46b63f4e0f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
@@ -31,6 +31,7 @@ import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
@@ -164,17 +165,16 @@ public class PipeTableModelTsFileBuilderV2 extends
PipeTsFileBuilder {
List<IMeasurementSchema> aggregatedSchemas =
tablets.stream()
.flatMap(tablet -> tablet.getSchemas().stream())
- .filter(Objects::nonNull)
.collect(Collectors.toList());
List<ColumnCategory> aggregatedColumnCategories =
tablets.stream()
.flatMap(tablet -> tablet.getColumnTypes().stream())
- .filter(Objects::nonNull)
.collect(Collectors.toList());
final Set<IMeasurementSchema> seen = new HashSet<>();
final List<Integer> distinctIndices =
IntStream.range(0, aggregatedSchemas.size())
+ .filter(i -> Objects.nonNull(aggregatedSchemas.get(i)))
.filter(
i -> seen.add(aggregatedSchemas.get(i))) // Only keep the
first occurrence index
.boxed()
@@ -195,15 +195,23 @@ public class PipeTableModelTsFileBuilderV2 extends
PipeTsFileBuilder {
for (int i = 0, size = tabletList.size(); i < size; ++i) {
final Tablet tablet = tabletList.get(i);
+ MeasurementSchema[] measurementSchemas =
+ tablet.getSchemas().stream()
+ .map(schema -> (MeasurementSchema) schema)
+ .toArray(MeasurementSchema[]::new);
+ Object[] values = Arrays.copyOf(tablet.getValues(),
tablet.getValues().length);
+ BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(),
tablet.getBitMaps().length);
+ ColumnCategory[] columnCategory = tablet.getColumnTypes().toArray(new
ColumnCategory[0]);
// convert date value to int refer to
//
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
- final Object[] values = Arrays.copyOf(tablet.getValues(),
tablet.getValues().length);
+ int validatedIndex = 0;
for (int j = 0; j < tablet.getSchemas().size(); ++j) {
- final IMeasurementSchema schema = tablet.getSchemas().get(j);
- if (Objects.nonNull(schema)
- && Objects.equals(TSDataType.DATE, schema.getType())
- && values[j] instanceof LocalDate[]) {
+ final MeasurementSchema schema = measurementSchemas[j];
+ if (Objects.isNull(schema) || Objects.isNull(columnCategory[j])) {
+ continue;
+ }
+ if (Objects.equals(TSDataType.DATE, schema.getType()) && values[j]
instanceof LocalDate[]) {
final LocalDate[] dates = ((LocalDate[]) values[j]);
final int[] dateValues = new int[dates.length];
for (int k = 0; k < Math.min(dates.length, tablet.getRowSize());
k++) {
@@ -213,6 +221,18 @@ public class PipeTableModelTsFileBuilderV2 extends
PipeTsFileBuilder {
}
values[j] = dateValues;
}
+ measurementSchemas[validatedIndex] = schema;
+ values[validatedIndex] = values[j];
+ bitMaps[validatedIndex] = bitMaps[j];
+ columnCategory[validatedIndex] = columnCategory[j];
+ validatedIndex++;
+ }
+
+ if (validatedIndex != measurementSchemas.length) {
+ values = Arrays.copyOf(values, validatedIndex);
+ measurementSchemas = Arrays.copyOf(measurementSchemas, validatedIndex);
+ bitMaps = Arrays.copyOf(bitMaps, validatedIndex);
+ columnCategory = Arrays.copyOf(columnCategory, validatedIndex);
}
final RelationalInsertTabletNode insertTabletNode =
@@ -221,24 +241,19 @@ public class PipeTableModelTsFileBuilderV2 extends
PipeTsFileBuilder {
new PartialPath(tablet.getTableName()),
// the data of the table model is aligned
true,
- tablet.getSchemas().stream()
- .filter(Objects::nonNull)
- .map(IMeasurementSchema::getMeasurementName)
+ Arrays.stream(measurementSchemas)
+ .map(MeasurementSchema::getMeasurementName)
.toArray(String[]::new),
- tablet.getSchemas().stream()
- .map(IMeasurementSchema::getType)
+ Arrays.stream(measurementSchemas)
+ .map(MeasurementSchema::getType)
.toArray(TSDataType[]::new),
// TODO: cast
- tablet.getSchemas().stream()
- .filter(Objects::nonNull)
- .map(schema -> (MeasurementSchema) schema)
- .toArray(MeasurementSchema[]::new),
+ measurementSchemas,
tablet.getTimestamps(),
- tablet.getBitMaps(),
+ bitMaps,
values,
tablet.getRowSize(),
- tablet.getColumnTypes().stream()
- .filter(Objects::nonNull)
+ Arrays.stream(columnCategory)
.map(TsTableColumnCategory::fromTsFileColumnCategory)
.toArray(TsTableColumnCategory[]::new));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
index 237e294df69..824bc35eb4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
@@ -242,6 +242,7 @@ public class PipeTreeModelTsFileBuilder extends
PipeTsFileBuilder {
final Set<IMeasurementSchema> seen = new HashSet<>();
final List<Integer> distinctIndices =
IntStream.range(0, aggregatedSchemas.size())
+ .filter(i -> Objects.nonNull(aggregatedSchemas.get(i)))
.filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the
first occurrence index
.boxed()
.collect(Collectors.toList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
index 9e8855a53b8..7f43d57de45 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
@@ -138,15 +139,23 @@ public class PipeTreeModelTsFileBuilderV2 extends
PipeTsFileBuilder {
final IMemTable memTable, final RestorableTsFileIOWriter writer) throws
Exception {
for (int i = 0, size = tabletList.size(); i < size; ++i) {
final Tablet tablet = tabletList.get(i);
+ MeasurementSchema[] measurementSchemas =
+ tablet.getSchemas().stream()
+ .map(schema -> (MeasurementSchema) schema)
+ .toArray(MeasurementSchema[]::new);
+ Object[] values = Arrays.copyOf(tablet.getValues(),
tablet.getValues().length);
+ BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(),
tablet.getBitMaps().length);
// convert date value to int refer to
//
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
- final Object[] values = Arrays.copyOf(tablet.getValues(),
tablet.getValues().length);
+ int validatedIndex = 0;
for (int j = 0; j < tablet.getSchemas().size(); ++j) {
- final IMeasurementSchema schema = tablet.getSchemas().get(j);
- if (Objects.nonNull(schema)
- && Objects.equals(TSDataType.DATE, schema.getType())
- && values[j] instanceof LocalDate[]) {
+ final IMeasurementSchema schema = measurementSchemas[j];
+ if (Objects.isNull(schema)) {
+ break;
+ }
+
+ if (Objects.equals(TSDataType.DATE, schema.getType()) && values[j]
instanceof LocalDate[]) {
final LocalDate[] dates = ((LocalDate[]) values[j]);
final int[] dateValues = new int[dates.length];
for (int k = 0; k < Math.min(dates.length, tablet.getRowSize());
k++) {
@@ -154,6 +163,16 @@ public class PipeTreeModelTsFileBuilderV2 extends
PipeTsFileBuilder {
}
values[j] = dateValues;
}
+ measurementSchemas[validatedIndex] = measurementSchemas[j];
+ values[validatedIndex] = values[j];
+ bitMaps[validatedIndex] = bitMaps[j];
+ validatedIndex++;
+ }
+
+ if (validatedIndex != measurementSchemas.length) {
+ values = Arrays.copyOf(values, validatedIndex);
+ measurementSchemas = Arrays.copyOf(measurementSchemas, validatedIndex);
+ bitMaps = Arrays.copyOf(bitMaps, validatedIndex);
}
final InsertTabletNode insertTabletNode =
@@ -161,21 +180,16 @@ public class PipeTreeModelTsFileBuilderV2 extends
PipeTsFileBuilder {
PLACEHOLDER_PLAN_NODE_ID,
new PartialPath(tablet.getDeviceId()),
isTabletAlignedList.get(i),
- tablet.getSchemas().stream()
- .filter(Objects::nonNull)
+ Arrays.stream(measurementSchemas)
.map(IMeasurementSchema::getMeasurementName)
.toArray(String[]::new),
- tablet.getSchemas().stream()
- .filter(Objects::nonNull)
+ Arrays.stream(measurementSchemas)
.map(IMeasurementSchema::getType)
.toArray(TSDataType[]::new),
// TODO: cast
- tablet.getSchemas().stream()
- .filter(Objects::nonNull)
- .map(schema -> (MeasurementSchema) schema)
- .toArray(MeasurementSchema[]::new),
+ measurementSchemas,
tablet.getTimestamps(),
- tablet.getBitMaps(),
+ bitMaps,
values,
tablet.getRowSize());