This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a0e8a0525 Pipe: Fix the inconsistency between the schema registered 
by TsFileWriter and the tablet written (#15304)
40a0e8a0525 is described below

commit 40a0e8a05259cfa2e2efc0f9d698d9b2a7df6c52
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Apr 11 10:25:30 2025 +0800

    Pipe: Fix the inconsistency between the schema registered by TsFileWriter 
and the tablet written (#15304)
---
 .../util/builder/PipeTableModeTsFileBuilder.java   | 35 ++++++++++++++++++----
 1 file changed, 29 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
index 2554e68c4b2..1cdb0236f57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
@@ -28,6 +28,7 @@ import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.WriteUtils;
 import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,12 +37,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder {
@@ -207,10 +210,31 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
 
       final List<T> tabletsToWrite = new ArrayList<>();
       final Map<IDeviceID, Long> deviceLastTimestampMap = new HashMap<>();
+      String tableName = null;
+
+      final List<IMeasurementSchema> columnSchemas = new ArrayList<>();
+      final List<Tablet.ColumnCategory> columnCategories = new ArrayList<>();
+      final Set<String> columnNames = new HashSet<>();
+
       while (!tablets.isEmpty()) {
         final T pair = tablets.peekFirst();
         if (timestampsAreNonOverlapping(
             (Pair<Tablet, List<Pair<IDeviceID, Integer>>>) pair, 
deviceLastTimestampMap)) {
+          final Tablet tablet = pair.left;
+          if (tableName == null) {
+            tableName = tablet.getTableName();
+          }
+
+          for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
+            final IMeasurementSchema schema = tablet.getSchemas().get(i);
+            if (schema == null || 
columnNames.contains(schema.getMeasurementName())) {
+              continue;
+            }
+            columnNames.add(schema.getMeasurementName());
+            columnSchemas.add(schema);
+            columnCategories.add(tablet.getColumnTypes().get(i));
+          }
+
           tabletsToWrite.add(pair);
           tablets.pollFirst();
           continue;
@@ -221,14 +245,13 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
       if (tablets.isEmpty()) {
         iterator.remove();
       }
-      boolean schemaNotRegistered = true;
+
+      if (tableName != null) {
+        fileWriter.registerTableSchema(new TableSchema(tableName, 
columnSchemas, columnCategories));
+      }
+
       for (final Pair<Tablet, List<Pair<IDeviceID, Integer>>> pair : 
tabletsToWrite) {
         final Tablet tablet = pair.left;
-        if (schemaNotRegistered) {
-          fileWriter.registerTableSchema(
-              new TableSchema(tablet.getTableName(), tablet.getSchemas(), 
tablet.getColumnTypes()));
-          schemaNotRegistered = false;
-        }
         try {
           fileWriter.writeTable(tablet, pair.right);
         } catch (WriteProcessException e) {

Reply via email to