This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 40a0e8a0525 Pipe: Fix the inconsistency between the schema registered
by TsFileWriter and the tablet written (#15304)
40a0e8a0525 is described below
commit 40a0e8a05259cfa2e2efc0f9d698d9b2a7df6c52
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Apr 11 10:25:30 2025 +0800
Pipe: Fix the inconsistency between the schema registered by TsFileWriter
and the tablet written (#15304)
---
.../util/builder/PipeTableModeTsFileBuilder.java | 35 ++++++++++++++++++----
1 file changed, 29 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
index 2554e68c4b2..1cdb0236f57 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
@@ -28,6 +28,7 @@ import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.WriteUtils;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,12 +37,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder {
@@ -207,10 +210,31 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
final List<T> tabletsToWrite = new ArrayList<>();
final Map<IDeviceID, Long> deviceLastTimestampMap = new HashMap<>();
+ String tableName = null;
+
+ final List<IMeasurementSchema> columnSchemas = new ArrayList<>();
+ final List<Tablet.ColumnCategory> columnCategories = new ArrayList<>();
+ final Set<String> columnNames = new HashSet<>();
+
while (!tablets.isEmpty()) {
final T pair = tablets.peekFirst();
if (timestampsAreNonOverlapping(
(Pair<Tablet, List<Pair<IDeviceID, Integer>>>) pair,
deviceLastTimestampMap)) {
+ final Tablet tablet = pair.left;
+ if (tableName == null) {
+ tableName = tablet.getTableName();
+ }
+
+ for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
+ final IMeasurementSchema schema = tablet.getSchemas().get(i);
+ if (schema == null ||
columnNames.contains(schema.getMeasurementName())) {
+ continue;
+ }
+ columnNames.add(schema.getMeasurementName());
+ columnSchemas.add(schema);
+ columnCategories.add(tablet.getColumnTypes().get(i));
+ }
+
tabletsToWrite.add(pair);
tablets.pollFirst();
continue;
@@ -221,14 +245,13 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
if (tablets.isEmpty()) {
iterator.remove();
}
- boolean schemaNotRegistered = true;
+
+ if (tableName != null) {
+ fileWriter.registerTableSchema(new TableSchema(tableName,
columnSchemas, columnCategories));
+ }
+
for (final Pair<Tablet, List<Pair<IDeviceID, Integer>>> pair :
tabletsToWrite) {
final Tablet tablet = pair.left;
- if (schemaNotRegistered) {
- fileWriter.registerTableSchema(
- new TableSchema(tablet.getTableName(), tablet.getSchemas(),
tablet.getColumnTypes()));
- schemaNotRegistered = false;
- }
try {
fileWriter.writeTable(tablet, pair.right);
} catch (WriteProcessException e) {