This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c4c947eb64 Pipe: Removed the useless ser-de in receiver raw req & 
Improved the handling logic for rowCount and null value bitmaps in 
insertTabletNode (#16133)
5c4c947eb64 is described below

commit 5c4c947eb64b966e56dd3bd6be6c684a02a928da
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 12 16:44:53 2025 +0800

    Pipe: Removed the useless ser-de in receiver raw req & Improved the 
handling logic for rowCount and null value bitmaps in insertTabletNode (#16133)
    
    * req
    
    * test
    
    * fix
    
    * test
    
    * partial
    
    * bug-fix
    
    * completion
---
 .../statement/PipeConvertedInsertRowStatement.java |  2 +-
 .../PipeConvertedInsertTabletStatement.java        |  2 +-
 .../request/PipeTransferTabletRawReq.java          | 22 +---------
 .../request/PipeTransferTabletRawReqV2.java        | 36 +---------------
 .../plan/planner/plan/node/write/InsertNode.java   |  9 ++--
 .../planner/plan/node/write/InsertTabletNode.java  | 32 +++++++-------
 .../plan/node/write/RelationalInsertRowNode.java   |  6 +--
 .../plan/node/write/RelationalInsertRowsNode.java  |  6 +--
 .../node/write/RelationalInsertTabletNode.java     | 12 +++---
 .../plan/statement/crud/InsertBaseStatement.java   | 22 +++++-----
 .../plan/statement/crud/InsertRowStatement.java    |  6 +--
 .../plan/statement/crud/InsertTabletStatement.java | 50 ++++++++++++++++++++--
 12 files changed, 96 insertions(+), 109 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
index edf8b636ecf..2484fd18de8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -53,7 +53,7 @@ public class PipeConvertedInsertRowStatement extends 
InsertRowStatement {
     measurements = insertRowStatement.getMeasurements();
     dataTypes = insertRowStatement.getDataTypes();
     columnCategories = insertRowStatement.getColumnCategories();
-    idColumnIndices = insertRowStatement.getIdColumnIndices();
+    tagColumnIndices = insertRowStatement.getTagColumnIndices();
     attrColumnIndices = insertRowStatement.getAttrColumnIndices();
     writeToTable = insertRowStatement.isWriteToTable();
     databaseName = insertRowStatement.getDatabaseName().orElse(null);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
index eee47a5b0b1..cf03c7c0c94 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -46,7 +46,7 @@ public class PipeConvertedInsertTabletStatement extends 
InsertTabletStatement {
     devicePath = insertTabletStatement.getDevicePath();
     isAligned = insertTabletStatement.isAligned();
     columnCategories = insertTabletStatement.getColumnCategories();
-    idColumnIndices = insertTabletStatement.getIdColumnIndices();
+    tagColumnIndices = insertTabletStatement.getTagColumnIndices();
     attrColumnIndices = insertTabletStatement.getAttrColumnIndices();
     writeToTable = insertTabletStatement.isWriteToTable();
     databaseName = insertTabletStatement.getDatabaseName().orElse(null);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 4e3888f8d0e..7da44f297d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -22,18 +22,13 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 import org.apache.iotdb.commons.exception.MetadataException;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
-import org.apache.iotdb.commons.utils.PathUtils;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
-import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.session.util.SessionUtils;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,22 +63,7 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
         return new InsertTabletStatement();
       }
 
-      final TSInsertTabletReq request = new TSInsertTabletReq();
-
-      for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
-        request.addToMeasurements(measurementSchema.getMeasurementName());
-        request.addToTypes(measurementSchema.getType().ordinal());
-      }
-
-      request.setPrefixPath(tablet.getDeviceId());
-      request.setIsAligned(isAligned);
-      request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
-      request.setValues(SessionUtils.getValueBuffer(tablet));
-      request.setSize(tablet.getRowSize());
-      request.setMeasurements(
-          
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
-
-      return StatementGenerator.createStatement(request);
+      return new InsertTabletStatement(tablet, isAligned, null);
     } catch (final MetadataException e) {
       LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
       return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
index ee0e2fb220a..43d8501252c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
@@ -22,19 +22,14 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 import org.apache.iotdb.commons.exception.MetadataException;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
-import org.apache.iotdb.commons.utils.PathUtils;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
-import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.session.util.SessionUtils;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +37,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent.isTabletEmpty;
 
@@ -70,35 +64,7 @@ public class PipeTransferTabletRawReqV2 extends 
PipeTransferTabletRawReq {
         return new InsertTabletStatement();
       }
 
-      final TSInsertTabletReq request = new TSInsertTabletReq();
-
-      for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
-        request.addToMeasurements(measurementSchema.getMeasurementName());
-        request.addToTypes(measurementSchema.getType().ordinal());
-      }
-
-      request.setPrefixPath(tablet.getDeviceId());
-      request.setIsAligned(isAligned);
-      request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
-      request.setValues(SessionUtils.getValueBuffer(tablet));
-      request.setSize(tablet.getRowSize());
-
-      // Tree model
-      if (Objects.isNull(dataBaseName)) {
-        request.setMeasurements(
-            
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
-        return StatementGenerator.createStatement(request);
-      }
-
-      // Table model
-      request.setWriteToTable(true);
-      request.columnCategories =
-          tablet.getColumnTypes().stream()
-              .map(t -> (byte) t.ordinal())
-              .collect(Collectors.toList());
-      final InsertTabletStatement statement = 
StatementGenerator.createStatement(request);
-      statement.setDatabaseName(dataBaseName);
-      return statement;
+      return new InsertTabletStatement(tablet, isAligned, dataBaseName);
     } catch (final MetadataException e) {
       LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
       return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 102119269bb..f913bbd7a97 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -53,7 +53,6 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 public abstract class InsertNode extends SearchNode {
-
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
   /**
@@ -69,7 +68,7 @@ public abstract class InsertNode extends SearchNode {
   protected TSDataType[] dataTypes;
 
   protected TsTableColumnCategory[] columnCategories;
-  protected List<Integer> idColumnIndices;
+  protected List<Integer> tagColumnIndices;
   protected int measurementColumnCnt = -1;
 
   protected int failedMeasurementNumber = 0;
@@ -339,7 +338,7 @@ public abstract class InsertNode extends SearchNode {
   public boolean allMeasurementFailed() {
     if (measurements != null) {
       return failedMeasurementNumber
-          >= measurements.length - (idColumnIndices == null ? 0 : 
idColumnIndices.size());
+          >= measurements.length - (tagColumnIndices == null ? 0 : 
tagColumnIndices.size());
     }
     return true;
   }
@@ -398,10 +397,10 @@ public abstract class InsertNode extends SearchNode {
   public void setColumnCategories(TsTableColumnCategory[] columnCategories) {
     this.columnCategories = columnCategories;
     if (columnCategories != null) {
-      idColumnIndices = new ArrayList<>();
+      tagColumnIndices = new ArrayList<>();
       for (int i = 0; i < columnCategories.length; i++) {
         if (columnCategories[i].equals(TsTableColumnCategory.TAG)) {
-          idColumnIndices.add(i);
+          tagColumnIndices.add(i);
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index c8b840cc6bb..be0b00931fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -74,7 +74,6 @@ import java.util.Objects;
 import static org.apache.iotdb.db.utils.CommonUtils.isAlive;
 
 public class InsertTabletNode extends InsertNode implements WALEntryValue {
-
   private static final String DATATYPE_UNSUPPORTED = "Data type %s is not 
supported.";
 
   protected long[] times; // times should be sorted. It is done in the session 
API.
@@ -528,17 +527,17 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     }
   }
 
-  private void writeTimes(ByteBuffer buffer) {
+  private void writeTimes(final ByteBuffer buffer) {
     ReadWriteIOUtils.write(rowCount, buffer);
-    for (long time : times) {
-      ReadWriteIOUtils.write(time, buffer);
+    for (int i = 0; i < rowCount; ++i) {
+      ReadWriteIOUtils.write(times[i], buffer);
     }
   }
 
-  private void writeTimes(DataOutputStream stream) throws IOException {
+  private void writeTimes(final DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(rowCount, stream);
-    for (long time : times) {
-      ReadWriteIOUtils.write(time, stream);
+    for (int i = 0; i < rowCount; ++i) {
+      ReadWriteIOUtils.write(times[i], stream);
     }
   }
 
@@ -556,7 +555,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
           ReadWriteIOUtils.write(BytesUtils.boolToByte(false), buffer);
         } else {
           ReadWriteIOUtils.write(BytesUtils.boolToByte(true), buffer);
-          buffer.put(bitMaps[i].getByteArray());
+          buffer.put(bitMaps[i].getByteArray(), 0, 
BitMap.getSizeOfBytes(rowCount));
         }
       }
     }
@@ -576,7 +575,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
           ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream);
         } else {
           ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream);
-          stream.write(bitMaps[i].getByteArray());
+          stream.write(bitMaps[i].getByteArray(), 0, 
BitMap.getSizeOfBytes(rowCount));
         }
       }
     }
@@ -643,7 +642,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
       case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = 0; j < rowCount; j++) {
-          if (binaryValues[j] != null) {
+          if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
             ReadWriteIOUtils.write(binaryValues[j], buffer);
           } else {
             ReadWriteIOUtils.write(0, buffer);
@@ -695,7 +694,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
       case BLOB:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = 0; j < rowCount; j++) {
-          if (binaryValues[j] != null) {
+          if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
             ReadWriteIOUtils.write(binaryValues[j], stream);
           } else {
             ReadWriteIOUtils.write(0, stream);
@@ -751,6 +750,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
       bitMaps =
           QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, 
rowCount).orElse(null);
     }
+
     columns =
         QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, 
measurementSize, rowCount);
     isAligned = buffer.get() == 1;
@@ -966,7 +966,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
       case BLOB:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = start; j < end; j++) {
-          if (binaryValues[j] != null) {
+          if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
             buffer.putInt(binaryValues[j].getLength());
             buffer.put(binaryValues[j].getValues());
           } else {
@@ -1310,10 +1310,10 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         + Arrays.toString(measurements)
         + ", rowCount="
         + rowCount
-        + ", timeRange=[,"
-        + times[0]
-        + ", "
-        + times[times.length - 1]
+        + ", timeRange=["
+        + (Objects.nonNull(times) && times.length > 0
+            ? times[0] + ", " + times[times.length - 1]
+            : "")
         + "]"
         + '}';
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
index d4240970386..a824448dc78 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -87,10 +87,10 @@ public class RelationalInsertRowNode extends InsertRowNode {
   @Override
   public IDeviceID getDeviceID() {
     if (deviceID == null) {
-      String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+      String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
       deviceIdSegments[0] = this.getTableName();
-      for (int i = 0; i < idColumnIndices.size(); i++) {
-        final Integer columnIndex = idColumnIndices.get(i);
+      for (int i = 0; i < tagColumnIndices.size(); i++) {
+        final Integer columnIndex = tagColumnIndices.get(i);
         deviceIdSegments[i + 1] =
             getValues()[columnIndex] != null ? 
getValues()[columnIndex].toString() : null;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 71faebec3ca..77020d9220d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -59,10 +59,10 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
       deviceIDs = new IDeviceID[getInsertRowNodeList().size()];
     }
     if (deviceIDs[rowIdx] == null) {
-      String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+      String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
       deviceIdSegments[0] = this.getTableName();
-      for (int i = 0; i < idColumnIndices.size(); i++) {
-        final Integer columnIndex = idColumnIndices.get(i);
+      for (int i = 0; i < tagColumnIndices.size(); i++) {
+        final Integer columnIndex = tagColumnIndices.get(i);
         deviceIdSegments[i + 1] =
             ((Object[]) 
getInsertRowNodeList().get(i).getValues()[columnIndex])[rowIdx].toString();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 0d4b698108e..6e5e8eeb598 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -115,10 +115,10 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
         deviceIDs = new IDeviceID[1];
       }
       if (deviceIDs[0] == null) {
-        String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+        String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
         deviceIdSegments[0] = this.getTableName();
-        for (int i = 0; i < idColumnIndices.size(); i++) {
-          final Integer columnIndex = idColumnIndices.get(i);
+        for (int i = 0; i < tagColumnIndices.size(); i++) {
+          final Integer columnIndex = tagColumnIndices.get(i);
           Object idSeg = ((Object[]) columns[columnIndex])[0];
           boolean isNull =
               bitMaps != null && bitMaps[columnIndex] != null && 
bitMaps[columnIndex].isMarked(0);
@@ -132,10 +132,10 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
       deviceIDs = new IDeviceID[rowCount];
     }
     if (deviceIDs[rowIdx] == null) {
-      String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+      String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
       deviceIdSegments[0] = this.getTableName();
-      for (int i = 0; i < idColumnIndices.size(); i++) {
-        final Integer columnIndex = idColumnIndices.get(i);
+      for (int i = 0; i < tagColumnIndices.size(); i++) {
+        final Integer columnIndex = tagColumnIndices.get(i);
         Object idSeg = ((Object[]) columns[columnIndex])[rowIdx];
         boolean isNull =
             bitMaps != null
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 28efbd84a81..950195c5248 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -87,7 +87,7 @@ public abstract class InsertBaseStatement extends Statement 
implements Accountab
   protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
 
   protected TsTableColumnCategory[] columnCategories;
-  protected List<Integer> idColumnIndices;
+  protected List<Integer> tagColumnIndices;
   protected List<Integer> attrColumnIndices;
   protected boolean writeToTable = false;
 
@@ -319,19 +319,19 @@ public abstract class InsertBaseStatement extends 
Statement implements Accountab
       columnCategories = new TsTableColumnCategory[measurements.length];
     }
     this.columnCategories[i] = columnCategory;
-    this.idColumnIndices = null;
+    this.tagColumnIndices = null;
   }
 
-  public List<Integer> getIdColumnIndices() {
-    if (idColumnIndices == null && columnCategories != null) {
-      idColumnIndices = new ArrayList<>();
+  public List<Integer> getTagColumnIndices() {
+    if (tagColumnIndices == null && columnCategories != null) {
+      tagColumnIndices = new ArrayList<>();
       for (int i = 0; i < columnCategories.length; i++) {
         if (columnCategories[i].equals(TsTableColumnCategory.TAG)) {
-          idColumnIndices.add(i);
+          tagColumnIndices.add(i);
         }
       }
     }
-    return idColumnIndices;
+    return tagColumnIndices;
   }
 
   public List<Integer> getAttrColumnIndices() {
@@ -432,7 +432,7 @@ public abstract class InsertBaseStatement extends Statement 
implements Accountab
     subRemoveAttributeColumns(columnsToKeep);
 
     // to reconstruct indices
-    idColumnIndices = null;
+    tagColumnIndices = null;
     attrColumnIndices = null;
   }
 
@@ -601,7 +601,7 @@ public abstract class InsertBaseStatement extends Statement 
implements Accountab
       System.arraycopy(
           columnCategories, pos, tmpCategories, pos + 1, 
columnCategories.length - pos);
       columnCategories = tmpCategories;
-      idColumnIndices = null;
+      tagColumnIndices = null;
     }
   }
 
@@ -626,7 +626,7 @@ public abstract class InsertBaseStatement extends Statement 
implements Accountab
     if (inputLocations != null) {
       CommonUtils.swapArray(inputLocations, src, target);
     }
-    idColumnIndices = null;
+    tagColumnIndices = null;
   }
 
   public boolean isWriteToTable() {
@@ -707,7 +707,7 @@ public abstract class InsertBaseStatement extends Statement 
implements Accountab
             + RamUsageEstimator.shallowSizeOf(dataTypes)
             + RamUsageEstimator.shallowSizeOf(columnCategories)
             // We assume that the integers are all cached by JVM
-            + shallowSizeOfList(idColumnIndices)
+            + shallowSizeOfList(tagColumnIndices)
             + shallowSizeOfList(attrColumnIndices)
             + shallowSizeOfList(logicalViewSchemaList)
             + (Objects.nonNull(logicalViewSchemaList)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 1f789ee0051..52901f6f7fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -493,10 +493,10 @@ public class InsertRowStatement extends 
InsertBaseStatement implements ISchemaVa
   @TableModel
   public IDeviceID getTableDeviceID() {
     if (deviceID == null) {
-      String[] deviceIdSegments = new String[getIdColumnIndices().size() + 1];
+      String[] deviceIdSegments = new String[getTagColumnIndices().size() + 1];
       deviceIdSegments[0] = this.getTableName();
-      for (int i = 0; i < getIdColumnIndices().size(); i++) {
-        final Integer columnIndex = getIdColumnIndices().get(i);
+      for (int i = 0; i < getTagColumnIndices().size(); i++) {
+        final Integer columnIndex = getTagColumnIndices().get(i);
         deviceIdSegments[i + 1] =
             values[columnIndex] != null ? values[columnIndex].toString() : 
null;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 5aa327f5dbb..e7eafc4df72 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.db.queryengine.plan.statement.crud;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
@@ -29,6 +31,7 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
@@ -47,11 +50,15 @@ import 
org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.DateUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -61,7 +68,6 @@ import java.util.Map;
 import java.util.Objects;
 
 public class InsertTabletStatement extends InsertBaseStatement implements 
ISchemaValidation {
-
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(InsertTabletStatement.class);
 
@@ -90,6 +96,42 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     this.recordedEndOfLogicalViewSchemaList = 0;
   }
 
+  public InsertTabletStatement(
+      final Tablet tablet, final boolean isAligned, final String databaseName)
+      throws MetadataException {
+    this();
+    setMeasurements(
+        tablet.getSchemas().stream()
+            .map(IMeasurementSchema::getMeasurementName)
+            .toArray(String[]::new));
+    setDataTypes(
+        
tablet.getSchemas().stream().map(IMeasurementSchema::getType).toArray(TSDataType[]::new));
+    
setDevicePath(DataNodeDevicePathCache.getInstance().getPartialPath(tablet.getDeviceId()));
+    setAligned(isAligned);
+    setTimes(tablet.getTimestamps());
+    
setColumns(Arrays.stream(tablet.getValues()).map(this::convertTableColumn).toArray());
+    setBitMaps(tablet.getBitMaps());
+    setRowCount(tablet.getRowSize());
+
+    if (Objects.nonNull(databaseName)) {
+      setWriteToTable(true);
+      setDatabaseName(databaseName);
+      setColumnCategories(
+          tablet.getColumnTypes().stream()
+              .map(TsTableColumnCategory::fromTsFileColumnCategory)
+              .toArray(TsTableColumnCategory[]::new));
+    }
+  }
+
+  private Object convertTableColumn(final Object input) {
+    return input instanceof LocalDate[]
+        ? Arrays.stream(((LocalDate[]) input))
+            .map(date -> Objects.nonNull(date) ? 
DateUtils.parseDateExpressionToInt(date) : 0)
+            .mapToInt(Integer::intValue)
+            .toArray()
+        : input;
+  }
+
   public InsertTabletStatement(InsertTabletNode node) {
     this();
     setDevicePath(node.getTargetPath());
@@ -461,10 +503,10 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
       deviceIDs = new IDeviceID[rowCount];
     }
     if (deviceIDs[rowIdx] == null) {
-      String[] deviceIdSegments = new String[getIdColumnIndices().size() + 1];
+      String[] deviceIdSegments = new String[getTagColumnIndices().size() + 1];
       deviceIdSegments[0] = this.getTableName();
-      for (int i = 0; i < getIdColumnIndices().size(); i++) {
-        final Integer columnIndex = getIdColumnIndices().get(i);
+      for (int i = 0; i < getTagColumnIndices().size(); i++) {
+        final Integer columnIndex = getTagColumnIndices().get(i);
         boolean isNull = isNull(rowIdx, i);
         deviceIdSegments[i + 1] =
             isNull ? null : ((Object[]) 
columns[columnIndex])[rowIdx].toString();

Reply via email to