This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 13b0582dfb1 Pipe: Modify the TableRawReq deserialization method to
support directconversion to TableStatement. (#16844)
13b0582dfb1 is described below
commit 13b0582dfb1a63a3f242b9545304d0d9fdede5cd
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Dec 9 15:00:00 2025 +0800
Pipe: Modify the TableRawReq deserialization method to support
directconversion to TableStatement. (#16844)
* Pipe: Modify the TableRawReq deserialization method to support direct
conversion to TableStatement.
* fix
* fix
* fix
* fix
* fix
* update
* update
* update
* refactor: optimize TabletStatementConverter according to code review
- Optimize times array copy: skip copy when lengths are equal, use
System.arraycopy
- Add warning logs when times array is null or too small
- Ensure all arrays (values, times, bitMaps) are copied to rowSize length
for immutability
- Filter out null columns when converting Statement to Tablet
- Rename idColumnIndices to tagColumnIndices
- Add skipString method to avoid constructing temporary objects
- Add comments explaining skipped fields in readMeasurement
- Use direct buffer position increment instead of reading bytes for skipping
- Ensure all column values are copied to ensure immutability
* update
* update
---
.../request/PipeTransferTabletBatchReqV2.java | 7 +-
.../request/PipeTransferTabletRawReq.java | 110 +++-
.../request/PipeTransferTabletRawReqV2.java | 50 +-
.../pipe/sink/util/TabletStatementConverter.java | 476 ++++++++++++++++
.../sink/util/sorter/InsertEventDataAdapter.java | 127 +++++
.../util/sorter/InsertTabletStatementAdapter.java | 118 ++++
...EventSorter.java => PipeInsertEventSorter.java} | 94 +++-
.../sorter/PipeTableModelTabletEventSorter.java | 67 ++-
.../sorter/PipeTreeModelTabletEventSorter.java | 48 +-
.../db/pipe/sink/util/sorter/TabletAdapter.java | 113 ++++
.../plan/statement/crud/InsertBaseStatement.java | 10 +
.../plan/statement/crud/InsertTabletStatement.java | 197 +++++++
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 4 +-
.../db/pipe/sink/PipeStatementEventSorterTest.java | 313 +++++++++++
.../sink/util/TabletStatementConverterTest.java | 607 +++++++++++++++++++++
15 files changed, 2257 insertions(+), 84 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
index d9c3fabfae8..c136ffbe7d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.apache.tsfile.write.record.Tablet;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -247,11 +246,7 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
- batchReq.tabletReqs.add(
- PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
- Tablet.deserialize(transferReq.body),
- ReadWriteIOUtils.readBool(transferReq.body),
- ReadWriteIOUtils.readString(transferReq.body)));
+
batchReq.tabletReqs.add(PipeTransferTabletRawReqV2.toTPipeTransferRawReq(transferReq.body));
}
batchReq.version = transferReq.version;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 7da44f297d1..af9b37edbf6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -43,10 +44,25 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletRawReq.class);
- protected transient Tablet tablet;
+ protected transient InsertTabletStatement statement;
+
protected transient boolean isAligned;
+ protected transient Tablet tablet;
+ /**
+ * Get Tablet. If tablet is null, convert from statement.
+ *
+ * @return Tablet object
+ */
public Tablet getTablet() {
+ if (tablet == null && statement != null) {
+ try {
+ tablet = statement.convertToTablet();
+ } catch (final MetadataException e) {
+ LOGGER.warn("Failed to convert statement to tablet.", e);
+ return null;
+ }
+ }
return tablet;
}
@@ -54,16 +70,29 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
return isAligned;
}
+ /**
+ * Construct Statement. If statement already exists, return it. Otherwise,
convert from tablet.
+ *
+ * @return InsertTabletStatement
+ */
public InsertTabletStatement constructStatement() {
+ if (statement != null) {
+ new
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+ return statement;
+ }
+
+ // Sort and deduplicate tablet before converting
new
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
try {
if (isTabletEmpty(tablet)) {
// Empty statement, will be filtered after construction
- return new InsertTabletStatement();
+ statement = new InsertTabletStatement();
+ return statement;
}
- return new InsertTabletStatement(tablet, isAligned, null);
+ statement = new InsertTabletStatement(tablet, isAligned, null);
+ return statement;
} catch (final MetadataException e) {
LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
return null;
@@ -107,8 +136,20 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
public static PipeTransferTabletRawReq fromTPipeTransferReq(final
TPipeTransferReq transferReq) {
final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
- tabletReq.tablet = Tablet.deserialize(transferReq.body);
- tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
+ final ByteBuffer buffer = transferReq.body;
+ final int startPosition = buffer.position();
+ try {
+ // V1: no databaseName, readDatabaseName = false
+ final InsertTabletStatement insertTabletStatement =
+
TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false);
+ tabletReq.isAligned = insertTabletStatement.isAligned();
+ // devicePath is already set in deserializeStatementFromTabletFormat for
V1 format
+ tabletReq.statement = insertTabletStatement;
+ } catch (final Exception e) {
+ buffer.position(startPosition);
+ tabletReq.tablet = Tablet.deserialize(buffer);
+ tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
+ }
tabletReq.version = transferReq.version;
tabletReq.type = transferReq.type;
@@ -118,18 +159,56 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
/////////////////////////////// Air Gap ///////////////////////////////
- public static byte[] toTPipeTransferBytes(final Tablet tablet, final boolean
isAligned)
- throws IOException {
+ /**
+ * Serialize to bytes. If tablet is null, convert from statement first.
+ *
+ * @return serialized bytes
+ * @throws IOException if serialization fails
+ */
+ public byte[] toTPipeTransferBytes() throws IOException {
+ Tablet tabletToSerialize = tablet;
+ boolean isAlignedToSerialize = isAligned;
+
+ // If tablet is null, convert from statement
+ if (tabletToSerialize == null && statement != null) {
+ try {
+ tabletToSerialize = statement.convertToTablet();
+ isAlignedToSerialize = statement.isAligned();
+ } catch (final MetadataException e) {
+ throw new IOException("Failed to convert statement to tablet for
serialization", e);
+ }
+ }
+
+ if (tabletToSerialize == null) {
+ throw new IOException("Cannot serialize: both tablet and statement are
null");
+ }
+
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBSinkRequestVersion.VERSION_1.getVersion(),
outputStream);
ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_RAW.getType(),
outputStream);
- tablet.serialize(outputStream);
- ReadWriteIOUtils.write(isAligned, outputStream);
+ tabletToSerialize.serialize(outputStream);
+ ReadWriteIOUtils.write(isAlignedToSerialize, outputStream);
return byteArrayOutputStream.toByteArray();
}
}
+ /**
+ * Static method for backward compatibility. Creates a temporary instance
and serializes.
+ *
+ * @param tablet Tablet to serialize
+ * @param isAligned whether aligned
+ * @return serialized bytes
+ * @throws IOException if serialization fails
+ */
+ public static byte[] toTPipeTransferBytes(final Tablet tablet, final boolean
isAligned)
+ throws IOException {
+ final PipeTransferTabletRawReq req = new PipeTransferTabletRawReq();
+ req.tablet = tablet;
+ req.isAligned = isAligned;
+ return req.toTPipeTransferBytes();
+ }
+
/////////////////////////////// Object ///////////////////////////////
@Override
@@ -141,7 +220,16 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
return false;
}
final PipeTransferTabletRawReq that = (PipeTransferTabletRawReq) obj;
- return Objects.equals(tablet, that.tablet)
+ // Compare statement if both have it, otherwise compare tablet
+ if (statement != null && that.statement != null) {
+ return Objects.equals(statement, that.statement)
+ && isAligned == that.isAligned
+ && version == that.version
+ && type == that.type
+ && Objects.equals(body, that.body);
+ }
+ // Fallback to tablet comparison
+ return Objects.equals(getTablet(), that.getTablet())
&& isAligned == that.isAligned
&& version == that.version
&& type == that.type
@@ -150,6 +238,6 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
@Override
public int hashCode() {
- return Objects.hash(tablet, isAligned, version, type, body);
+ return Objects.hash(getTablet(), isAligned, version, type, body);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
index 43d8501252c..3c5f420a317 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
@@ -52,6 +53,16 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
@Override
public InsertTabletStatement constructStatement() {
+ if (statement != null) {
+ if (Objects.isNull(dataBaseName)) {
+ new
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+ } else {
+ new
PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary();
+ }
+
+ return statement;
+ }
+
if (Objects.isNull(dataBaseName)) {
new
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
} else {
@@ -86,6 +97,16 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
return tabletReq;
}
+ public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq(final
ByteBuffer buffer) {
+ final PipeTransferTabletRawReqV2 tabletReq = new
PipeTransferTabletRawReqV2();
+
+ tabletReq.deserializeTPipeTransferRawReq(buffer);
+ tabletReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ tabletReq.type = PipeRequestType.TRANSFER_TABLET_RAW_V2.getType();
+
+ return tabletReq;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletRawReqV2 toTPipeTransferReq(
@@ -114,13 +135,11 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
final TPipeTransferReq transferReq) {
final PipeTransferTabletRawReqV2 tabletReq = new
PipeTransferTabletRawReqV2();
- tabletReq.tablet = Tablet.deserialize(transferReq.body);
- tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
- tabletReq.dataBaseName = ReadWriteIOUtils.readString(transferReq.body);
+ tabletReq.deserializeTPipeTransferRawReq(transferReq.body);
+ tabletReq.body = transferReq.body;
tabletReq.version = transferReq.version;
tabletReq.type = transferReq.type;
- tabletReq.body = transferReq.body;
return tabletReq;
}
@@ -161,4 +180,27 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
public int hashCode() {
return Objects.hash(super.hashCode(), dataBaseName);
}
+
+ /////////////////////////////// Util ///////////////////////////////
+
+ public void deserializeTPipeTransferRawReq(final ByteBuffer buffer) {
+ final int startPosition = buffer.position();
+ try {
+ // V2: read databaseName, readDatabaseName = true
+ final InsertTabletStatement insertTabletStatement =
+
TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, true);
+ this.isAligned = insertTabletStatement.isAligned();
+ // databaseName is already set in deserializeStatementFromTabletFormat
when
+ // readDatabaseName=true
+ this.dataBaseName = insertTabletStatement.getDatabaseName().orElse(null);
+ this.statement = insertTabletStatement;
+ } catch (final Exception e) {
+ // If Statement deserialization fails, fallback to Tablet format
+ // Reset buffer position for Tablet deserialization
+ buffer.position(startPosition);
+ this.tablet = Tablet.deserialize(buffer);
+ this.isAligned = ReadWriteIOUtils.readBool(buffer);
+ this.dataBaseName = ReadWriteIOUtils.readString(buffer);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
new file mode 100644
index 00000000000..c5b9ebed4d5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Utility class for converting between InsertTabletStatement and Tablet
format ByteBuffer. This
+ * avoids creating intermediate Tablet objects and directly converts between
formats with only the
+ * fields needed.
+ */
+public class TabletStatementConverter {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TabletStatementConverter.class);
+
+ // Memory calculation constants - extracted from RamUsageEstimator for
better performance
+ private static final long NUM_BYTES_ARRAY_HEADER =
+ org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
+ private static final long NUM_BYTES_OBJECT_REF =
+ org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+ private static final long NUM_BYTES_OBJECT_HEADER =
+ org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
+ private static final long SIZE_OF_ARRAYLIST =
+
org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance(java.util.ArrayList.class);
+ private static final long SIZE_OF_BITMAP =
+ org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance(
+ org.apache.tsfile.utils.BitMap.class);
+
+ private TabletStatementConverter() {
+ // Utility class, no instantiation
+ }
+
+ /**
+ * Deserialize InsertTabletStatement from Tablet format ByteBuffer.
+ *
+ * @param byteBuffer ByteBuffer containing serialized data
+ * @param readDatabaseName whether to read databaseName from buffer (for V2
format)
+ * @return InsertTabletStatement with all fields set, including devicePath
+ */
+ public static InsertTabletStatement deserializeStatementFromTabletFormat(
+ final ByteBuffer byteBuffer, final boolean readDatabaseName) throws
IllegalPathException {
+ final InsertTabletStatement statement = new InsertTabletStatement();
+
+ // Calculate memory size during deserialization, use INSTANCE_SIZE constant
+ long memorySize = InsertTabletStatement.getInstanceSize();
+
+ final String insertTargetName = ReadWriteIOUtils.readString(byteBuffer);
+
+ final int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
+
+ // deserialize schemas
+ final int schemaSize =
+ BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer))
+ ? ReadWriteIOUtils.readInt(byteBuffer)
+ : 0;
+ final String[] measurement = new String[schemaSize];
+ final TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[schemaSize];
+ final TSDataType[] dataTypes = new TSDataType[schemaSize];
+
+ // Calculate memory for arrays headers and references during
deserialization
+ // measurements array: array header + object references
+ long measurementMemorySize =
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+
+ // dataTypes array: shallow size (array header + references)
+ long dataTypesMemorySize =
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+
+ // columnCategories array: shallow size (array header + references)
+ long columnCategoriesMemorySize =
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+
+ // tagColumnIndices (TAG columns): ArrayList base + array header
+ long tagColumnIndicesSize = SIZE_OF_ARRAYLIST;
+ tagColumnIndicesSize += NUM_BYTES_ARRAY_HEADER;
+
+ // Deserialize and calculate memory in the same loop
+ for (int i = 0; i < schemaSize; i++) {
+ final boolean hasSchema =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ if (hasSchema) {
+ final Pair<String, TSDataType> pair = readMeasurement(byteBuffer);
+ measurement[i] = pair.getLeft();
+ dataTypes[i] = pair.getRight();
+ columnCategories[i] =
+ TsTableColumnCategory.fromTsFileColumnCategory(
+ ColumnCategory.values()[byteBuffer.get()]);
+
+ // Calculate memory for each measurement string
+ if (measurement[i] != null) {
+ measurementMemorySize +=
org.apache.tsfile.utils.RamUsageEstimator.sizeOf(measurement[i]);
+ }
+
+ // Calculate memory for TAG column indices
+ if (columnCategories[i] != null &&
columnCategories[i].equals(TsTableColumnCategory.TAG)) {
+ tagColumnIndicesSize +=
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ Integer.BYTES + NUM_BYTES_OBJECT_HEADER)
+ + NUM_BYTES_OBJECT_REF;
+ }
+ }
+ }
+
+ // Add all calculated memory to total
+ memorySize += measurementMemorySize;
+ memorySize += dataTypesMemorySize;
+
+ // deserialize times and calculate memory during deserialization
+ final long[] times = new long[rowSize];
+ // Calculate memory: array header + long size * rowSize
+ final long timesMemorySize =
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);
+
+ final boolean isTimesNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ if (isTimesNotNull) {
+ for (int i = 0; i < rowSize; i++) {
+ times[i] = ReadWriteIOUtils.readLong(byteBuffer);
+ }
+ }
+
+ // Add times memory to total
+ memorySize += timesMemorySize;
+
+ // deserialize bitmaps and calculate memory during deserialization
+ final BitMap[] bitMaps;
+ final long bitMapsMemorySize;
+
+ final boolean isBitMapsNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ if (isBitMapsNotNull) {
+ // Use the method that returns both BitMap array and memory size
+ final Pair<BitMap[], Long> bitMapsAndMemory =
+ readBitMapsFromBufferWithMemory(byteBuffer, schemaSize);
+ bitMaps = bitMapsAndMemory.getLeft();
+ bitMapsMemorySize = bitMapsAndMemory.getRight();
+ } else {
+ // Calculate memory for empty BitMap array: array header + references
+ bitMaps = new BitMap[schemaSize];
+ bitMapsMemorySize =
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+ }
+
+ // Add bitMaps memory to total
+ memorySize += bitMapsMemorySize;
+
+ // Deserialize values and calculate memory during deserialization
+ final Object[] values;
+ final long valuesMemorySize;
+
+ final boolean isValuesNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ if (isValuesNotNull) {
+ // Use the method that returns both values array and memory size
+ final Pair<Object[], Long> valuesAndMemory =
+ readValuesFromBufferWithMemory(byteBuffer, dataTypes, schemaSize,
rowSize);
+ values = valuesAndMemory.getLeft();
+ valuesMemorySize = valuesAndMemory.getRight();
+ } else {
+ // Calculate memory for empty values array: array header + references
+ values = new Object[schemaSize];
+ valuesMemorySize =
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+ }
+
+ // Add values memory to total
+ memorySize += valuesMemorySize;
+
+ final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer);
+
+ statement.setMeasurements(measurement);
+ statement.setTimes(times);
+ statement.setBitMaps(bitMaps);
+ statement.setDataTypes(dataTypes);
+ statement.setColumns(values);
+ statement.setRowCount(rowSize);
+ statement.setAligned(isAligned);
+
+ // Read databaseName if requested (V2 format)
+ if (readDatabaseName) {
+ final String databaseName = ReadWriteIOUtils.readString(byteBuffer);
+ if (databaseName != null) {
+ statement.setDatabaseName(databaseName);
+ statement.setWriteToTable(true);
+ // For table model, insertTargetName is table name, convert to
lowercase
+ statement.setDevicePath(new
PartialPath(insertTargetName.toLowerCase(), false));
+ // Calculate memory for databaseName
+ memorySize +=
org.apache.tsfile.utils.RamUsageEstimator.sizeOf(databaseName);
+
+ statement.setColumnCategories(columnCategories);
+
+ memorySize += columnCategoriesMemorySize;
+ memorySize += tagColumnIndicesSize;
+ } else {
+ // For tree model, use DataNodeDevicePathCache
+ statement.setDevicePath(
+
DataNodeDevicePathCache.getInstance().getPartialPath(insertTargetName));
+ statement.setColumnCategories(null);
+ }
+ } else {
+ // V1 format: no databaseName in buffer, always use
DataNodeDevicePathCache
+ statement.setDevicePath(
+
DataNodeDevicePathCache.getInstance().getPartialPath(insertTargetName));
+ statement.setColumnCategories(null);
+ }
+
+ // Calculate memory for devicePath
+ memorySize +=
InsertNodeMemoryEstimator.sizeOfPartialPath(statement.getDevicePath());
+
+ // Set the pre-calculated memory size to avoid recalculation
+ statement.setRamBytesUsed(memorySize);
+
+ return statement;
+ }
+
+ /**
+ * Deserialize InsertTabletStatement from Tablet format ByteBuffer (V1
format, no databaseName).
+ *
+ * @param byteBuffer ByteBuffer containing serialized data
+ * @return InsertTabletStatement with devicePath set using
DataNodeDevicePathCache
+ */
+ public static InsertTabletStatement deserializeStatementFromTabletFormat(
+ final ByteBuffer byteBuffer) throws IllegalPathException {
+ return deserializeStatementFromTabletFormat(byteBuffer, false);
+ }
+
+ /**
+ * Skip a string in ByteBuffer without reading it. This is more efficient
than reading and
+ * discarding the string.
+ *
+ * @param buffer ByteBuffer to skip string from
+ */
+ private static void skipString(final ByteBuffer buffer) {
+ final int size = ReadWriteIOUtils.readInt(buffer);
+ if (size > 0) {
+ buffer.position(buffer.position() + size);
+ }
+ }
+
+ /**
+ * Read measurement name and data type from buffer, skipping other
measurement schema fields
+ * (encoding, compression, and tags/attributes) that are not needed for
InsertTabletStatement.
+ *
+ * @param buffer ByteBuffer containing serialized measurement schema
+ * @return Pair of measurement name and data type
+ */
+ private static Pair<String, TSDataType> readMeasurement(final ByteBuffer
buffer) {
+ // Read measurement name and data type
+ final Pair<String, TSDataType> pair =
+ new Pair<>(ReadWriteIOUtils.readString(buffer),
TSDataType.deserializeFrom(buffer));
+
+ // Skip encoding type (byte) and compression type (byte) - 2 bytes total
+ buffer.position(buffer.position() + 2);
+
+ // Skip props map (Map<String, String>)
+ final int size = ReadWriteIOUtils.readInt(buffer);
+ if (size > 0) {
+ for (int i = 0; i < size; i++) {
+ // Skip key (String) and value (String) without constructing temporary
objects
+ skipString(buffer);
+ skipString(buffer);
+ }
+ }
+
+ return pair;
+ }
+
+ /**
+ * Deserialize bitmaps and calculate memory size during deserialization.
Returns a Pair of BitMap
+ * array and the calculated memory size.
+ */
+ private static Pair<BitMap[], Long> readBitMapsFromBufferWithMemory(
+ final ByteBuffer byteBuffer, final int columns) {
+ final BitMap[] bitMaps = new BitMap[columns];
+
+ // Calculate memory: array header + object references
+ long memorySize =
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);
+
+ for (int i = 0; i < columns; i++) {
+ final boolean hasBitMap =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ if (hasBitMap) {
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
+ final byte[] byteArray = valueBinary.getValues();
+ bitMaps[i] = new BitMap(size, byteArray);
+
+ // Calculate memory for this BitMap: BitMap object + byte array
+ // BitMap shallow size + byte array (array header + array length)
+ memorySize +=
+ SIZE_OF_BITMAP
+ + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + byteArray.length);
+ }
+ }
+
+ return new Pair<>(bitMaps, memorySize);
+ }
+
+ /**
+ * Deserialize values from buffer and calculate memory size during
deserialization. Returns a Pair
+ * of values array and the calculated memory size.
+ *
+ * @param byteBuffer data values
+ * @param types data types
+ * @param columns column number
+ * @param rowSize row number
+ * @return Pair of values array and memory size
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
+ private static Pair<Object[], Long> readValuesFromBufferWithMemory(
+ final ByteBuffer byteBuffer, final TSDataType[] types, final int
columns, final int rowSize) {
+ final Object[] values = new Object[columns];
+
+ // Calculate memory: array header + object references
+ long memorySize =
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);
+
+ for (int i = 0; i < columns; i++) {
+ final boolean isValueColumnsNotNull =
+ BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ if (isValueColumnsNotNull && types[i] == null) {
+ continue;
+ }
+
+ switch (types[i]) {
+ case BOOLEAN:
+ final boolean[] boolValues = new boolean[rowSize];
+ if (isValueColumnsNotNull) {
+ for (int index = 0; index < rowSize; index++) {
+ boolValues[index] =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ }
+ }
+ values[i] = boolValues;
+ // Calculate memory for boolean array: array header + 1 byte per
element (aligned)
+ memorySize +=
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + rowSize);
+ break;
+ case INT32:
+ case DATE:
+ final int[] intValues = new int[rowSize];
+ if (isValueColumnsNotNull) {
+ for (int index = 0; index < rowSize; index++) {
+ intValues[index] = ReadWriteIOUtils.readInt(byteBuffer);
+ }
+ }
+ values[i] = intValues;
+ // Calculate memory for int array: array header + 4 bytes per
element (aligned)
+ memorySize +=
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + (long) Integer.BYTES * rowSize);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ final long[] longValues = new long[rowSize];
+ if (isValueColumnsNotNull) {
+ for (int index = 0; index < rowSize; index++) {
+ longValues[index] = ReadWriteIOUtils.readLong(byteBuffer);
+ }
+ }
+ values[i] = longValues;
+ // Calculate memory for long array: array header + 8 bytes per
element (aligned)
+ memorySize +=
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);
+ break;
+ case FLOAT:
+ final float[] floatValues = new float[rowSize];
+ if (isValueColumnsNotNull) {
+ for (int index = 0; index < rowSize; index++) {
+ floatValues[index] = ReadWriteIOUtils.readFloat(byteBuffer);
+ }
+ }
+ values[i] = floatValues;
+ // Calculate memory for float array: array header + 4 bytes per
element (aligned)
+ memorySize +=
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + (long) Float.BYTES * rowSize);
+ break;
+ case DOUBLE:
+ final double[] doubleValues = new double[rowSize];
+ if (isValueColumnsNotNull) {
+ for (int index = 0; index < rowSize; index++) {
+ doubleValues[index] = ReadWriteIOUtils.readDouble(byteBuffer);
+ }
+ }
+ values[i] = doubleValues;
+ // Calculate memory for double array: array header + 8 bytes per
element (aligned)
+ memorySize +=
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + (long) Double.BYTES * rowSize);
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ case OBJECT:
+ // Handle object array type: Binary[] is an array of objects
+ final Binary[] binaryValues = new Binary[rowSize];
+ // Calculate memory for Binary array: array header + object
references
+ long binaryArrayMemory =
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * rowSize);
+
+ if (isValueColumnsNotNull) {
+ for (int index = 0; index < rowSize; index++) {
+ final boolean isNotNull =
+ BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ if (isNotNull) {
+ binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer);
+ // Calculate memory for each Binary object during
deserialization
+ binaryArrayMemory += binaryValues[index].ramBytesUsed();
+ } else {
+ binaryValues[index] = Binary.EMPTY_VALUE;
+ // EMPTY_VALUE also has memory cost
+ binaryArrayMemory += Binary.EMPTY_VALUE.ramBytesUsed();
+ }
+ }
+ } else {
+ Arrays.fill(binaryValues, Binary.EMPTY_VALUE);
+ // Calculate memory for all EMPTY_VALUE
+ binaryArrayMemory += (long) rowSize *
Binary.EMPTY_VALUE.ramBytesUsed();
+ }
+ values[i] = binaryValues;
+ // Add calculated Binary array memory to total
+ memorySize += binaryArrayMemory;
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("data type %s is not supported when convert data
at client", types[i]));
+ }
+ }
+
+ return new Pair<>(values, memorySize);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertEventDataAdapter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertEventDataAdapter.java
new file mode 100644
index 00000000000..62111cb4129
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertEventDataAdapter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util.sorter;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.BitMap;
+
+/**
+ * Adapter interface to encapsulate common operations needed for sorting and
deduplication. This
+ * interface allows the sorter to work with both Tablet and
InsertTabletStatement.
+ */
+public interface InsertEventDataAdapter {
+
+ /**
+ * Get the number of columns.
+ *
+ * @return number of columns
+ */
+ int getColumnCount();
+
+ /**
+ * Get data type for a specific column.
+ *
+ * @param columnIndex column index
+ * @return data type of the column
+ */
+ TSDataType getDataType(int columnIndex);
+
+ /**
+ * Get bit maps for null values.
+ *
+ * @return array of bit maps, may be null
+ */
+ BitMap[] getBitMaps();
+
+ /**
+ * Set bit maps for null values.
+ *
+ * @param bitMaps array of bit maps
+ */
+ void setBitMaps(BitMap[] bitMaps);
+
+ /**
+ * Get value arrays for all columns.
+ *
+ * @return array of value arrays (Object[])
+ */
+ Object[] getValues();
+
+ /**
+ * Set value array for a specific column.
+ *
+ * @param columnIndex column index
+ * @param value value array
+ */
+ void setValue(int columnIndex, Object value);
+
+ /**
+ * Get timestamps array.
+ *
+ * @return array of timestamps
+ */
+ long[] getTimestamps();
+
+ /**
+ * Set timestamps array.
+ *
+ * @param timestamps array of timestamps
+ */
+ void setTimestamps(long[] timestamps);
+
+ /**
+ * Get row size/count.
+ *
+ * @return number of rows
+ */
+ int getRowSize();
+
+ /**
+ * Set row size/count.
+ *
+ * @param rowSize number of rows
+ */
+ void setRowSize(int rowSize);
+
+ /**
+ * Get timestamp at a specific row index.
+ *
+ * @param rowIndex row index
+ * @return timestamp value
+ */
+ long getTimestamp(int rowIndex);
+
+ /**
+ * Get device ID at a specific row index (for table model).
+ *
+ * @param rowIndex row index
+ * @return device ID
+ */
+ IDeviceID getDeviceID(int rowIndex);
+
+ /**
+ * Check if the DATE type column value is stored as LocalDate[] (Tablet) or
int[] (Statement).
+ *
+ * @param columnIndex column index
+ * @return true if DATE type is stored as LocalDate[], false if stored as
int[]
+ */
+ boolean isDateStoredAsLocalDate(int columnIndex);
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertTabletStatementAdapter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertTabletStatementAdapter.java
new file mode 100644
index 00000000000..30f74a22965
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertTabletStatementAdapter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util.sorter;
+
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.BitMap;
+
+/** Adapter for InsertTabletStatement to implement InsertEventDataAdapter
interface. */
+public class InsertTabletStatementAdapter implements InsertEventDataAdapter {
+
+ private final InsertTabletStatement statement;
+
+ public InsertTabletStatementAdapter(final InsertTabletStatement statement) {
+ this.statement = statement;
+ }
+
+ @Override
+ public int getColumnCount() {
+ final Object[] columns = statement.getColumns();
+ return columns != null ? columns.length : 0;
+ }
+
+ @Override
+ public TSDataType getDataType(int columnIndex) {
+ final TSDataType[] dataTypes = statement.getDataTypes();
+ if (dataTypes != null && columnIndex < dataTypes.length) {
+ return dataTypes[columnIndex];
+ }
+ return null;
+ }
+
+ @Override
+ public BitMap[] getBitMaps() {
+ return statement.getBitMaps();
+ }
+
+ @Override
+ public void setBitMaps(BitMap[] bitMaps) {
+ statement.setBitMaps(bitMaps);
+ }
+
+ @Override
+ public Object[] getValues() {
+ return statement.getColumns();
+ }
+
+ @Override
+ public void setValue(int columnIndex, Object value) {
+ Object[] columns = statement.getColumns();
+ if (columns != null && columnIndex < columns.length) {
+ columns[columnIndex] = value;
+ }
+ }
+
+ @Override
+ public long[] getTimestamps() {
+ return statement.getTimes();
+ }
+
+ @Override
+ public void setTimestamps(long[] timestamps) {
+ statement.setTimes(timestamps);
+ }
+
+ @Override
+ public int getRowSize() {
+ return statement.getRowCount();
+ }
+
+ @Override
+ public void setRowSize(int rowSize) {
+ statement.setRowCount(rowSize);
+ }
+
+ @Override
+ public long getTimestamp(int rowIndex) {
+ long[] times = statement.getTimes();
+ if (times != null && rowIndex < times.length) {
+ return times[rowIndex];
+ }
+ return 0;
+ }
+
+ @Override
+ public IDeviceID getDeviceID(int rowIndex) {
+ return statement.getTableDeviceID(rowIndex);
+ }
+
+ @Override
+ public boolean isDateStoredAsLocalDate(int columnIndex) {
+ // InsertTabletStatement stores DATE as int[]
+ return false;
+ }
+
+ public InsertTabletStatement getStatement() {
+ return statement;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
similarity index 65%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
index 4ad64ae278b..817df1c06e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
@@ -19,19 +19,20 @@
package org.apache.iotdb.db.pipe.sink.util.sorter;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.time.LocalDate;
import java.util.Objects;
-public class PipeTabletEventSorter {
+public class PipeInsertEventSorter {
- protected final Tablet tablet;
+ protected final InsertEventDataAdapter dataAdapter;
protected Integer[] index;
protected boolean isSorted = true;
@@ -39,8 +40,31 @@ public class PipeTabletEventSorter {
protected int[] deDuplicatedIndex;
protected int deDuplicatedSize;
- public PipeTabletEventSorter(final Tablet tablet) {
- this.tablet = tablet;
+ /**
+ * Constructor for Tablet.
+ *
+ * @param tablet the tablet to sort
+ */
+ public PipeInsertEventSorter(final Tablet tablet) {
+ this.dataAdapter = new TabletAdapter(tablet);
+ }
+
+ /**
+ * Constructor for InsertTabletStatement.
+ *
+ * @param statement the insert tablet statement to sort
+ */
+ public PipeInsertEventSorter(final InsertTabletStatement statement) {
+ this.dataAdapter = new InsertTabletStatementAdapter(statement);
+ }
+
+ /**
+ * Constructor with adapter (for internal use or advanced scenarios).
+ *
+ * @param adapter the data adapter
+ */
+ protected PipeInsertEventSorter(final InsertEventDataAdapter adapter) {
+ this.dataAdapter = adapter;
}
// Input:
@@ -54,35 +78,42 @@ public class PipeTabletEventSorter {
// (Used index: [2(3), 4(0)])
// Col: [6, 1]
protected void sortAndMayDeduplicateValuesAndBitMaps() {
- int columnIndex = 0;
- for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
- final IMeasurementSchema schema = tablet.getSchemas().get(i);
- if (schema != null) {
+ final int columnCount = dataAdapter.getColumnCount();
+ BitMap[] bitMaps = dataAdapter.getBitMaps();
+ boolean bitMapsModified = false;
+
+ for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
+ final TSDataType dataType = dataAdapter.getDataType(columnIndex);
+ if (dataType != null) {
BitMap deDuplicatedBitMap = null;
BitMap originalBitMap = null;
- if (tablet.getBitMaps() != null && tablet.getBitMaps()[columnIndex] !=
null) {
- originalBitMap = tablet.getBitMaps()[columnIndex];
+ if (bitMaps != null && columnIndex < bitMaps.length &&
bitMaps[columnIndex] != null) {
+ originalBitMap = bitMaps[columnIndex];
deDuplicatedBitMap = new BitMap(originalBitMap.getSize());
}
- tablet.getValues()[columnIndex] =
+ final Object[] values = dataAdapter.getValues();
+ final Object reorderedValue =
reorderValueListAndBitMap(
- tablet.getValues()[columnIndex],
- schema.getType(),
- originalBitMap,
- deDuplicatedBitMap);
+ values[columnIndex], dataType, columnIndex, originalBitMap,
deDuplicatedBitMap);
+ dataAdapter.setValue(columnIndex, reorderedValue);
- if (tablet.getBitMaps() != null && tablet.getBitMaps()[columnIndex] !=
null) {
- tablet.getBitMaps()[columnIndex] = deDuplicatedBitMap;
+ if (bitMaps != null && columnIndex < bitMaps.length &&
bitMaps[columnIndex] != null) {
+ bitMaps[columnIndex] = deDuplicatedBitMap;
+ bitMapsModified = true;
}
- columnIndex++;
}
}
+
+ if (bitMapsModified) {
+ dataAdapter.setBitMaps(bitMaps);
+ }
}
protected Object reorderValueListAndBitMap(
final Object valueList,
final TSDataType dataType,
+ final int columnIndex,
final BitMap originalBitMap,
final BitMap deDuplicatedBitMap) {
// Older version's sender may contain null values, we need to cover this
case
@@ -107,13 +138,26 @@ public class PipeTabletEventSorter {
}
return deDuplicatedIntValues;
case DATE:
- final LocalDate[] dateValues = (LocalDate[]) valueList;
- final LocalDate[] deDuplicatedDateValues = new
LocalDate[dateValues.length];
- for (int i = 0; i < deDuplicatedSize; i++) {
- deDuplicatedDateValues[i] =
- dateValues[getLastNonnullIndex(i, originalBitMap,
deDuplicatedBitMap)];
+ // DATE type: Tablet uses LocalDate[], InsertTabletStatement uses int[]
+ if (dataAdapter.isDateStoredAsLocalDate(columnIndex)) {
+ // Tablet: LocalDate[]
+ final LocalDate[] dateValues = (LocalDate[]) valueList;
+ final LocalDate[] deDuplicatedDateValues = new
LocalDate[dateValues.length];
+ for (int i = 0; i < deDuplicatedSize; i++) {
+ deDuplicatedDateValues[i] =
+ dateValues[getLastNonnullIndex(i, originalBitMap,
deDuplicatedBitMap)];
+ }
+ return deDuplicatedDateValues;
+ } else {
+ // InsertTabletStatement: int[]
+ final int[] intDateValues = (int[]) valueList;
+ final int[] deDuplicatedIntDateValues = new
int[intDateValues.length];
+ for (int i = 0; i < deDuplicatedSize; i++) {
+ deDuplicatedIntDateValues[i] =
+ intDateValues[getLastNonnullIndex(i, originalBitMap,
deDuplicatedBitMap)];
+ }
+ return deDuplicatedIntDateValues;
}
- return deDuplicatedDateValues;
case INT64:
case TIMESTAMP:
final long[] longValues = (long[]) valueList;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java
index 5735b51c6d0..ba034cae3a3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.sink.util.sorter;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
@@ -31,14 +33,29 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter {
+public class PipeTableModelTabletEventSorter extends PipeInsertEventSorter {
private int initIndexSize;
+ /**
+ * Constructor for Tablet.
+ *
+ * @param tablet the tablet to sort
+ */
public PipeTableModelTabletEventSorter(final Tablet tablet) {
super(tablet);
deDuplicatedSize = tablet == null ? 0 : tablet.getRowSize();
}
+ /**
+ * Constructor for InsertTabletStatement.
+ *
+ * @param statement the insert tablet statement to sort
+ */
+ public PipeTableModelTabletEventSorter(final InsertTabletStatement
statement) {
+ super(statement);
+ deDuplicatedSize = statement == null ? 0 : statement.getRowCount();
+ }
+
/**
* For the sorting and deduplication needs of the table model tablet, it is
done according to the
* {@link IDeviceID}. For sorting, it is necessary to sort the {@link
IDeviceID} first, and then
@@ -46,18 +63,19 @@ public class PipeTableModelTabletEventSorter extends
PipeTabletEventSorter {
* the same timestamp in different {@link IDeviceID} will not be processed.
*/
public void sortAndDeduplicateByDevIdTimestamp() {
- if (tablet == null || tablet.getRowSize() < 1) {
+ if (dataAdapter == null || dataAdapter.getRowSize() < 1) {
return;
}
final HashMap<IDeviceID, List<Pair<Integer, Integer>>> deviceIDToIndexMap
= new HashMap<>();
- final long[] timestamps = tablet.getTimestamps();
+ final long[] timestamps = dataAdapter.getTimestamps();
+ final int rowSize = dataAdapter.getRowSize();
- IDeviceID lastDevice = tablet.getDeviceID(0);
- long previousTimestamp = tablet.getTimestamp(0);
+ IDeviceID lastDevice = dataAdapter.getDeviceID(0);
+ long previousTimestamp = dataAdapter.getTimestamp(0);
int lasIndex = 0;
- for (int i = 1, size = tablet.getRowSize(); i < size; ++i) {
- final IDeviceID deviceID = tablet.getDeviceID(i);
+ for (int i = 1; i < rowSize; ++i) {
+ final IDeviceID deviceID = dataAdapter.getDeviceID(i);
final long currentTimestamp = timestamps[i];
final int deviceComparison = deviceID.compareTo(lastDevice);
if (deviceComparison == 0) {
@@ -92,7 +110,7 @@ public class PipeTableModelTabletEventSorter extends
PipeTabletEventSorter {
if (!list.isEmpty()) {
isSorted = false;
}
- list.add(new Pair<>(lasIndex, tablet.getRowSize()));
+ list.add(new Pair<>(lasIndex, rowSize));
if (isSorted && isDeDuplicated) {
return;
@@ -100,8 +118,8 @@ public class PipeTableModelTabletEventSorter extends
PipeTabletEventSorter {
initIndexSize = 0;
deDuplicatedSize = 0;
- index = new Integer[tablet.getRowSize()];
- deDuplicatedIndex = new int[tablet.getRowSize()];
+ index = new Integer[rowSize];
+ deDuplicatedIndex = new int[rowSize];
deviceIDToIndexMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(
@@ -129,19 +147,22 @@ public class PipeTableModelTabletEventSorter extends
PipeTabletEventSorter {
}
private void sortAndDeduplicateValuesAndBitMapsWithTimestamp() {
- tablet.setTimestamps(
+ // TIMESTAMP is not a DATE type, so columnIndex is not relevant here, use
-1
+ dataAdapter.setTimestamps(
(long[])
- reorderValueListAndBitMap(tablet.getTimestamps(),
TSDataType.TIMESTAMP, null, null));
+ reorderValueListAndBitMap(
+ dataAdapter.getTimestamps(), TSDataType.TIMESTAMP, -1, null,
null));
sortAndMayDeduplicateValuesAndBitMaps();
- tablet.setRowSize(deDuplicatedSize);
+ dataAdapter.setRowSize(deDuplicatedSize);
}
private void sortTimestamps(final int startIndex, final int endIndex) {
- Arrays.sort(this.index, startIndex, endIndex,
Comparator.comparingLong(tablet::getTimestamp));
+ Arrays.sort(
+ this.index, startIndex, endIndex,
Comparator.comparingLong(dataAdapter::getTimestamp));
}
private void deDuplicateTimestamps(final int startIndex, final int endIndex)
{
- final long[] timestamps = tablet.getTimestamps();
+ final long[] timestamps = dataAdapter.getTimestamps();
long lastTime = timestamps[index[startIndex]];
for (int i = startIndex + 1; i < endIndex; i++) {
if (lastTime != (lastTime = timestamps[index[i]])) {
@@ -153,12 +174,13 @@ public class PipeTableModelTabletEventSorter extends
PipeTabletEventSorter {
/** Sort by time only. */
public void sortByTimestampIfNecessary() {
- if (tablet == null || tablet.getRowSize() == 0) {
+ if (dataAdapter == null || dataAdapter.getRowSize() == 0) {
return;
}
- final long[] timestamps = tablet.getTimestamps();
- for (int i = 1, size = tablet.getRowSize(); i < size; ++i) {
+ final long[] timestamps = dataAdapter.getTimestamps();
+ final int rowSize = dataAdapter.getRowSize();
+ for (int i = 1; i < rowSize; ++i) {
final long currentTimestamp = timestamps[i];
final long previousTimestamp = timestamps[i - 1];
@@ -172,8 +194,8 @@ public class PipeTableModelTabletEventSorter extends
PipeTabletEventSorter {
return;
}
- index = new Integer[tablet.getRowSize()];
- for (int i = 0, size = tablet.getRowSize(); i < size; i++) {
+ index = new Integer[rowSize];
+ for (int i = 0; i < rowSize; i++) {
index[i] = i;
}
@@ -185,7 +207,8 @@ public class PipeTableModelTabletEventSorter extends
PipeTabletEventSorter {
}
private void sortTimestamps() {
- Arrays.sort(this.index, Comparator.comparingLong(tablet::getTimestamp));
- Arrays.sort(tablet.getTimestamps(), 0, tablet.getRowSize());
+ Arrays.sort(this.index,
Comparator.comparingLong(dataAdapter::getTimestamp));
+ final long[] timestamps = dataAdapter.getTimestamps();
+ Arrays.sort(timestamps, 0, dataAdapter.getRowSize());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java
index c26f59220f9..2a56b706463 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java
@@ -19,25 +19,43 @@
package org.apache.iotdb.db.pipe.sink.util.sorter;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
import org.apache.tsfile.write.record.Tablet;
import java.util.Arrays;
import java.util.Comparator;
-public class PipeTreeModelTabletEventSorter extends PipeTabletEventSorter {
+public class PipeTreeModelTabletEventSorter extends PipeInsertEventSorter {
+ /**
+ * Constructor for Tablet.
+ *
+ * @param tablet the tablet to sort
+ */
public PipeTreeModelTabletEventSorter(final Tablet tablet) {
super(tablet);
deDuplicatedSize = tablet == null ? 0 : tablet.getRowSize();
}
+ /**
+ * Constructor for InsertTabletStatement.
+ *
+ * @param statement the insert tablet statement to sort
+ */
+ public PipeTreeModelTabletEventSorter(final InsertTabletStatement statement)
{
+ super(statement);
+ deDuplicatedSize = statement == null ? 0 : statement.getRowCount();
+ }
+
public void deduplicateAndSortTimestampsIfNecessary() {
- if (tablet == null || tablet.getRowSize() == 0) {
+ if (dataAdapter == null || dataAdapter.getRowSize() == 0) {
return;
}
- long[] timestamps = tablet.getTimestamps();
- for (int i = 1, size = tablet.getRowSize(); i < size; ++i) {
+ long[] timestamps = dataAdapter.getTimestamps();
+ final int rowSize = dataAdapter.getRowSize();
+ for (int i = 1; i < rowSize; ++i) {
final long currentTimestamp = timestamps[i];
final long previousTimestamp = timestamps[i - 1];
@@ -54,9 +72,9 @@ public class PipeTreeModelTabletEventSorter extends
PipeTabletEventSorter {
return;
}
- index = new Integer[tablet.getRowSize()];
- deDuplicatedIndex = new int[tablet.getRowSize()];
- for (int i = 0, size = tablet.getRowSize(); i < size; i++) {
+ index = new Integer[rowSize];
+ deDuplicatedIndex = new int[rowSize];
+ for (int i = 0; i < rowSize; i++) {
index[i] = i;
}
@@ -78,14 +96,16 @@ public class PipeTreeModelTabletEventSorter extends
PipeTabletEventSorter {
private void sortTimestamps() {
// Index is sorted stably because it is Integer[]
- Arrays.sort(index, Comparator.comparingLong(tablet::getTimestamp));
- Arrays.sort(tablet.getTimestamps(), 0, tablet.getRowSize());
+ Arrays.sort(index, Comparator.comparingLong(dataAdapter::getTimestamp));
+ final long[] timestamps = dataAdapter.getTimestamps();
+ Arrays.sort(timestamps, 0, dataAdapter.getRowSize());
}
private void deduplicateTimestamps() {
deDuplicatedSize = 0;
- long[] timestamps = tablet.getTimestamps();
- for (int i = 1, size = tablet.getRowSize(); i < size; i++) {
+ long[] timestamps = dataAdapter.getTimestamps();
+ final int rowSize = dataAdapter.getRowSize();
+ for (int i = 1; i < rowSize; i++) {
if (timestamps[i] != timestamps[i - 1]) {
deDuplicatedIndex[deDuplicatedSize] = i - 1;
timestamps[deDuplicatedSize] = timestamps[i - 1];
@@ -94,8 +114,8 @@ public class PipeTreeModelTabletEventSorter extends
PipeTabletEventSorter {
}
}
- deDuplicatedIndex[deDuplicatedSize] = tablet.getRowSize() - 1;
- timestamps[deDuplicatedSize] = timestamps[tablet.getRowSize() - 1];
- tablet.setRowSize(++deDuplicatedSize);
+ deDuplicatedIndex[deDuplicatedSize] = rowSize - 1;
+ timestamps[deDuplicatedSize] = timestamps[rowSize - 1];
+ dataAdapter.setRowSize(++deDuplicatedSize);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/TabletAdapter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/TabletAdapter.java
new file mode 100644
index 00000000000..b200127a5d4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/TabletAdapter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util.sorter;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.List;
+
+/** Adapter for Tablet to implement InsertEventDataAdapter interface. */
+public class TabletAdapter implements InsertEventDataAdapter {
+
+ private final Tablet tablet;
+
+ public TabletAdapter(final Tablet tablet) {
+ this.tablet = tablet;
+ }
+
+ @Override
+ public int getColumnCount() {
+ final Object[] values = tablet.getValues();
+ return values != null ? values.length : 0;
+ }
+
+ @Override
+ public TSDataType getDataType(int columnIndex) {
+ final List<IMeasurementSchema> schemas = tablet.getSchemas();
+ if (schemas != null && columnIndex < schemas.size()) {
+ final IMeasurementSchema schema = schemas.get(columnIndex);
+ return schema != null ? schema.getType() : null;
+ }
+ return null;
+ }
+
+ @Override
+ public BitMap[] getBitMaps() {
+ return tablet.getBitMaps();
+ }
+
+ @Override
+ public void setBitMaps(BitMap[] bitMaps) {
+ tablet.setBitMaps(bitMaps);
+ }
+
+ @Override
+ public Object[] getValues() {
+ return tablet.getValues();
+ }
+
+ @Override
+ public void setValue(int columnIndex, Object value) {
+ tablet.getValues()[columnIndex] = value;
+ }
+
+ @Override
+ public long[] getTimestamps() {
+ return tablet.getTimestamps();
+ }
+
+ @Override
+ public void setTimestamps(long[] timestamps) {
+ tablet.setTimestamps(timestamps);
+ }
+
+ @Override
+ public int getRowSize() {
+ return tablet.getRowSize();
+ }
+
+ @Override
+ public void setRowSize(int rowSize) {
+ tablet.setRowSize(rowSize);
+ }
+
+ @Override
+ public long getTimestamp(int rowIndex) {
+ return tablet.getTimestamp(rowIndex);
+ }
+
+ @Override
+ public IDeviceID getDeviceID(int rowIndex) {
+ return tablet.getDeviceID(rowIndex);
+ }
+
+ @Override
+ public boolean isDateStoredAsLocalDate(int columnIndex) {
+ return true;
+ }
+
+ public Tablet getTablet() {
+ return tablet;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 1aae871ea0c..d8786e33959 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -839,6 +839,16 @@ public abstract class InsertBaseStatement extends
Statement implements Accountab
return ramBytesUsed;
}
+ /**
+ * Set the pre-calculated memory size. This is used when memory size is
calculated during
+ * deserialization to avoid recalculation.
+ *
+ * @param ramBytesUsed the calculated memory size in bytes
+ */
+ public void setRamBytesUsed(long ramBytesUsed) {
+ this.ramBytesUsed = ramBytesUsed;
+ }
+
private long shallowSizeOfList(List<?> list) {
return Objects.nonNull(list)
? UpdateDetailContainer.LIST_SIZE
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index d2c1d2a783a..12369d81bfd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
@@ -58,6 +59,8 @@ import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.LocalDate;
import java.util.ArrayList;
@@ -69,11 +72,22 @@ import java.util.Map;
import java.util.Objects;
public class InsertTabletStatement extends InsertBaseStatement implements
ISchemaValidation {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InsertTabletStatement.class);
+
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(InsertTabletStatement.class);
private static final String DATATYPE_UNSUPPORTED = "Data type %s is not
supported.";
+ /**
+ * Get the instance size of InsertTabletStatement for memory calculation.
+ *
+ * @return instance size in bytes
+ */
+ public static long getInstanceSize() {
+ return INSTANCE_SIZE;
+ }
+
protected long[] times; // times should be sorted. It is done in the session
API.
protected BitMap[] nullBitMaps;
protected Object[] columns;
@@ -701,6 +715,189 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
}
+ /**
+ * Convert this InsertTabletStatement to Tablet. This method constructs a
Tablet object from this
+ * statement, converting all necessary fields. All arrays are copied to
rowSize length to ensure
+ * immutability.
+ *
+ * @return Tablet object
+ * @throws MetadataException if conversion fails
+ */
+ public Tablet convertToTablet() throws MetadataException {
+ try {
+ // Get deviceId/tableName from devicePath
+ final String deviceIdOrTableName =
+ this.getDevicePath() != null ? this.getDevicePath().getFullPath() :
"";
+
+ // Get schemas from measurementSchemas
+ final MeasurementSchema[] measurementSchemas =
this.getMeasurementSchemas();
+ final String[] measurements = this.getMeasurements();
+ final TSDataType[] dataTypes = this.getDataTypes();
+ // If measurements and dataTypes are not null, use measurements.length
as the standard length
+ final int originalSchemaSize = measurements != null ?
measurements.length : 0;
+
+ // Build schemas and track valid column indices (skip null columns)
+ // measurements and dataTypes being null is standard - skip those columns
+ final List<IMeasurementSchema> schemas = new ArrayList<>();
+ final List<Integer> validColumnIndices = new ArrayList<>();
+ for (int i = 0; i < originalSchemaSize; i++) {
+ if (dataTypes != null && measurements[i] != null && dataTypes[i] !=
null) {
+ // Create MeasurementSchema if not present
+ schemas.add(new MeasurementSchema(measurements[i], dataTypes[i]));
+ validColumnIndices.add(i);
+ }
+ // Skip null columns - don't add to schemas or validColumnIndices
+ }
+
+ final int schemaSize = schemas.size();
+
+ // Get columnTypes (for table model) - only for valid columns
+ final TsTableColumnCategory[] columnCategories =
this.getColumnCategories();
+ final List<ColumnCategory> tabletColumnTypes = new ArrayList<>();
+ if (columnCategories != null && columnCategories.length > 0) {
+ for (final int validIndex : validColumnIndices) {
+ if (columnCategories[validIndex] != null) {
+
tabletColumnTypes.add(columnCategories[validIndex].toTsFileColumnType());
+ } else {
+ tabletColumnTypes.add(ColumnCategory.FIELD);
+ }
+ }
+ } else {
+ // Default to FIELD for all valid columns if not specified
+ for (int i = 0; i < schemaSize; i++) {
+ tabletColumnTypes.add(ColumnCategory.FIELD);
+ }
+ }
+
+ // Get timestamps - always copy to ensure immutability
+ final long[] times = this.getTimes();
+ final int rowSize = this.getRowCount();
+ final long[] timestamps;
+ if (times != null && times.length >= rowSize && rowSize > 0) {
+ timestamps = new long[rowSize];
+ System.arraycopy(times, 0, timestamps, 0, rowSize);
+ } else {
+ LOGGER.warn(
+ "Times array is null or too small. times.length={}, rowSize={},
deviceId={}",
+ times != null ? times.length : 0,
+ rowSize,
+ deviceIdOrTableName);
+ timestamps = new long[0];
+ }
+
+ // Get values - convert Statement columns to Tablet format, only for
valid columns
+ // All arrays are truncated/copied to rowSize length
+ final Object[] statementColumns = this.getColumns();
+ final Object[] tabletValues = new Object[schemaSize];
+ if (statementColumns != null && statementColumns.length > 0) {
+ for (int i = 0; i < validColumnIndices.size(); i++) {
+ final int originalIndex = validColumnIndices.get(i);
+ if (statementColumns[originalIndex] != null &&
dataTypes[originalIndex] != null) {
+ tabletValues[i] =
+ convertColumnToTablet(
+ statementColumns[originalIndex], dataTypes[originalIndex],
rowSize);
+ } else {
+ tabletValues[i] = null;
+ }
+ }
+ }
+
+ // Get bitMaps - copy and truncate to rowSize, only for valid columns
+ final BitMap[] originalBitMaps = this.getBitMaps();
+ final BitMap[] bitMaps;
+ if (originalBitMaps != null && originalBitMaps.length > 0) {
+ bitMaps = new BitMap[schemaSize];
+ for (int i = 0; i < validColumnIndices.size(); i++) {
+ final int originalIndex = validColumnIndices.get(i);
+ if (originalBitMaps[originalIndex] != null) {
+ // Create a new BitMap truncated to rowSize
+ final byte[] truncatedBytes =
+ originalBitMaps[originalIndex].getTruncatedByteArray(rowSize);
+ bitMaps[i] = new BitMap(rowSize, truncatedBytes);
+ } else {
+ bitMaps[i] = null;
+ }
+ }
+ } else {
+ bitMaps = null;
+ }
+
+ // Create Tablet using the full constructor
+ // Tablet(String tableName, List<IMeasurementSchema> schemas,
List<ColumnCategory>
+ // columnTypes,
+ // long[] timestamps, Object[] values, BitMap[] bitMaps, int
rowSize)
+ return new Tablet(
+ deviceIdOrTableName,
+ schemas,
+ tabletColumnTypes,
+ timestamps,
+ tabletValues,
+ bitMaps,
+ rowSize);
+ } catch (final Exception e) {
+ throw new MetadataException("Failed to convert InsertTabletStatement to
Tablet", e);
+ }
+ }
+
+ /**
+ * Convert a single column value from Statement format to Tablet format.
Statement uses primitive
+ * arrays (e.g., int[], long[], float[]), while Tablet may need different
format. All arrays are
+ * copied and truncated to rowSize length to ensure immutability - even if
the original array is
+ * modified, the converted array remains unchanged.
+ *
+ * @param columnValue column value from Statement (primitive array)
+ * @param dataType data type of the column
+ * @param rowSize number of rows to copy (truncate to this length)
+ * @return column value in Tablet format (copied and truncated array)
+ */
+ private Object convertColumnToTablet(
+ final Object columnValue, final TSDataType dataType, final int rowSize) {
+
+ if (columnValue == null) {
+ return null;
+ }
+
+ if (TSDataType.DATE.equals(dataType)) {
+ final int[] values = (int[]) columnValue;
+ // Copy and truncate to rowSize
+ final int[] copiedValues = Arrays.copyOf(values, Math.min(values.length,
rowSize));
+ final LocalDate[] localDateValue = new LocalDate[rowSize];
+ for (int i = 0; i < copiedValues.length; i++) {
+ localDateValue[i] = DateUtils.parseIntToLocalDate(copiedValues[i]);
+ }
+ // Fill remaining with null if needed
+ for (int i = copiedValues.length; i < rowSize; i++) {
+ localDateValue[i] = null;
+ }
+ return localDateValue;
+ }
+
+ // For primitive arrays, copy and truncate to rowSize
+ if (columnValue instanceof boolean[]) {
+ final boolean[] original = (boolean[]) columnValue;
+ return Arrays.copyOf(original, Math.min(original.length, rowSize));
+ } else if (columnValue instanceof int[]) {
+ final int[] original = (int[]) columnValue;
+ return Arrays.copyOf(original, Math.min(original.length, rowSize));
+ } else if (columnValue instanceof long[]) {
+ final long[] original = (long[]) columnValue;
+ return Arrays.copyOf(original, Math.min(original.length, rowSize));
+ } else if (columnValue instanceof float[]) {
+ final float[] original = (float[]) columnValue;
+ return Arrays.copyOf(original, Math.min(original.length, rowSize));
+ } else if (columnValue instanceof double[]) {
+ final double[] original = (double[]) columnValue;
+ return Arrays.copyOf(original, Math.min(original.length, rowSize));
+ } else if (columnValue instanceof Binary[]) {
+ // For Binary arrays, create a new array and copy references, truncate
to rowSize
+ final Binary[] original = (Binary[]) columnValue;
+ return Arrays.copyOf(original, Math.min(original.length, rowSize));
+ }
+
+ // For other types, return as-is (should not happen for standard types)
+ return columnValue;
+ }
+
@Override
public String toString() {
final int size =
CommonDescriptor.getInstance().getConfig().getPathLogMaxSize();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 1a122c63d4f..0cc4470882e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -437,7 +437,7 @@ public class PipeDataNodeThriftRequestTest {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
t.serialize(outputStream);
- ReadWriteIOUtils.write(false, outputStream);
+ ReadWriteIOUtils.write(true, outputStream);
tabletBuffers.add(
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
tabletDataBase.add("test");
@@ -459,7 +459,7 @@ public class PipeDataNodeThriftRequestTest {
new byte[] {'a', 'b'},
deserializedReq.getBinaryReqs().get(0).getByteBuffer().array());
Assert.assertEquals(node,
deserializedReq.getInsertNodeReqs().get(0).getInsertNode());
Assert.assertEquals(t, deserializedReq.getTabletReqs().get(0).getTablet());
- Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned());
+ Assert.assertTrue(deserializedReq.getTabletReqs().get(0).getIsAligned());
Assert.assertEquals("test",
deserializedReq.getBinaryReqs().get(0).getDataBaseName());
Assert.assertEquals("test",
deserializedReq.getTabletReqs().get(0).getDataBaseName());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementEventSorterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementEventSorterTest.java
new file mode 100644
index 00000000000..7c13d9764b3
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementEventSorterTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
+import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class PipeStatementEventSorterTest {
+
+ @Test
+ public void testTreeModelDeduplicateAndSort() throws Exception {
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+ Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+ long timestamp = 300;
+ for (long i = 0; i < 10; i++) {
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, timestamp + i);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex,
timestamp + i);
+ }
+
+ rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, timestamp - i);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex,
timestamp - i);
+ }
+
+ rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, timestamp);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex,
timestamp);
+ }
+ }
+
+ Set<Integer> indices = new HashSet<>();
+ for (int i = 0; i < 30; i++) {
+ indices.add((int) tablet.getTimestamp(i));
+ }
+
+ Assert.assertFalse(tablet.isSorted());
+
+ // Convert Tablet to Statement
+ InsertTabletStatement statement = new InsertTabletStatement(tablet, true,
null);
+
+ // Sort using Statement
+ new
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+
+ Assert.assertEquals(indices.size(), statement.getRowCount());
+
+ final long[] timestamps = Arrays.copyOfRange(statement.getTimes(), 0,
statement.getRowCount());
+ final Object[] columns = statement.getColumns();
+ for (int i = 0; i < 3; ++i) {
+ Assert.assertArrayEquals(
+ timestamps, Arrays.copyOfRange((long[]) columns[i], 0,
statement.getRowCount()));
+ }
+
+ for (int i = 1; i < statement.getRowCount(); ++i) {
+ Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+ for (int j = 0; j < 3; ++j) {
+ Assert.assertTrue(((long[]) columns[j])[i] > ((long[]) columns[j])[i -
1]);
+ }
+ }
+ }
+
+ @Test
+ public void testTreeModelDeduplicate() throws Exception {
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+ final Tablet tablet = new Tablet("root.sg.device", schemaList, 10);
+
+ final long timestamp = 300;
+ for (long i = 0; i < 10; i++) {
+ final int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, timestamp);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(
+ schemaList.get(s).getMeasurementName(),
+ rowIndex,
+ (i + s) % 3 != 0 ? timestamp + i : null);
+ }
+ }
+
+ final Set<Integer> indices = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ indices.add((int) tablet.getTimestamp(i));
+ }
+
+ Assert.assertTrue(tablet.isSorted());
+
+ // Convert Tablet to Statement
+ InsertTabletStatement statement = new InsertTabletStatement(tablet, true,
null);
+
+ // Sort using Statement
+ new
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+
+ Assert.assertEquals(indices.size(), statement.getRowCount());
+
+ final long[] timestamps = Arrays.copyOfRange(statement.getTimes(), 0,
statement.getRowCount());
+ final Object[] columns = statement.getColumns();
+ Assert.assertEquals(timestamps[0] + 8, ((long[]) columns[0])[0]);
+ for (int i = 1; i < 3; ++i) {
+ Assert.assertEquals(timestamps[0] + 9, ((long[]) columns[i])[0]);
+ }
+ }
+
+ @Test
+ public void testTreeModelSort() throws Exception {
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+ Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+ for (long i = 0; i < 10; i++) {
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, (long) rowIndex + 2);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex,
(long) rowIndex + 2);
+ }
+
+ rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, rowIndex);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex,
(long) rowIndex);
+ }
+
+ rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, (long) rowIndex - 2);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex,
(long) rowIndex - 2);
+ }
+ }
+
+ Set<Integer> indices = new HashSet<>();
+ for (int i = 0; i < 30; i++) {
+ indices.add((int) tablet.getTimestamp(i));
+ }
+
+ Assert.assertFalse(tablet.isSorted());
+
+ long[] timestamps = Arrays.copyOfRange(tablet.getTimestamps(), 0,
tablet.getRowSize());
+ for (int i = 0; i < 3; ++i) {
+ Assert.assertArrayEquals(
+ timestamps, Arrays.copyOfRange((long[]) tablet.getValues()[i], 0,
tablet.getRowSize()));
+ }
+
+ for (int i = 1; i < tablet.getRowSize(); ++i) {
+ Assert.assertTrue(timestamps[i] != timestamps[i - 1]);
+ for (int j = 0; j < 3; ++j) {
+ Assert.assertNotEquals((long) tablet.getValue(i, j), (long)
tablet.getValue(i - 1, j));
+ }
+ }
+
+ // Convert Tablet to Statement
+ InsertTabletStatement statement = new InsertTabletStatement(tablet, true,
null);
+
+ // Sort using Statement
+ new
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+
+ Assert.assertEquals(indices.size(), statement.getRowCount());
+
+ timestamps = Arrays.copyOfRange(statement.getTimes(), 0,
statement.getRowCount());
+ final Object[] columns = statement.getColumns();
+ for (int i = 0; i < 3; ++i) {
+ Assert.assertArrayEquals(
+ timestamps, Arrays.copyOfRange((long[]) columns[i], 0,
statement.getRowCount()));
+ }
+
+ for (int i = 1; i < statement.getRowCount(); ++i) {
+ Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+ for (int j = 0; j < 3; ++j) {
+ Assert.assertTrue(((long[]) columns[j])[i] > ((long[]) columns[j])[i -
1]);
+ }
+ }
+ }
+
+ @Test
+ public void testTableModelDeduplicateAndSort() throws Exception {
+ doTableModelTest(true, true);
+ }
+
+ @Test
+ public void testTableModelDeduplicate() throws Exception {
+ doTableModelTest(true, false);
+ }
+
+ @Test
+ public void testTableModelSort() throws Exception {
+ doTableModelTest(false, true);
+ }
+
+ @Test
+ public void testTableModelSort1() throws Exception {
+ doTableModelTest1();
+ }
+
+ public void doTableModelTest(final boolean hasDuplicates, final boolean
isUnSorted)
+ throws Exception {
+ final Tablet tablet =
+ PipeTabletEventSorterTest.generateTablet("test", 10, hasDuplicates,
isUnSorted);
+
+ // Convert Tablet to Statement
+ InsertTabletStatement statement = new InsertTabletStatement(tablet, false,
"test_db");
+
+ // Sort using Statement
+ new
PipeTableModelTabletEventSorter(statement).sortAndDeduplicateByDevIdTimestamp();
+
+ long[] timestamps = statement.getTimes();
+ final Object[] columns = statement.getColumns();
+ for (int i = 1; i < statement.getRowCount(); i++) {
+ long time = timestamps[i];
+ Assert.assertTrue(time > timestamps[i - 1]);
+ Assert.assertEquals(
+ ((Binary[]) columns[0])[i],
+ new Binary(String.valueOf(i /
100).getBytes(StandardCharsets.UTF_8)));
+ Assert.assertEquals(((long[]) columns[1])[i], (long) i);
+ Assert.assertEquals(((float[]) columns[2])[i], i * 1.0f, 0.001f);
+ Assert.assertEquals(
+ ((Binary[]) columns[3])[i],
+ new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
+ Assert.assertEquals(((long[]) columns[4])[i], (long) i);
+ Assert.assertEquals(((int[]) columns[5])[i], i);
+ Assert.assertEquals(((double[]) columns[6])[i], i * 0.1, 0.0001);
+ // DATE is stored as int[] in Statement, not LocalDate[]
+ LocalDate expectedDate = PipeTabletEventSorterTest.getDate(i);
+ int expectedDateInt =
+
org.apache.tsfile.utils.DateUtils.parseDateExpressionToInt(expectedDate);
+ Assert.assertEquals(((int[]) columns[7])[i], expectedDateInt);
+ Assert.assertEquals(
+ ((Binary[]) columns[8])[i],
+ new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+
+ public void doTableModelTest1() throws Exception {
+ final Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10,
false, true);
+
+ // Convert Tablet to Statement
+ InsertTabletStatement statement = new InsertTabletStatement(tablet, false,
"test_db");
+
+ // Sort using Statement
+ new
PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary();
+
+ long[] timestamps = statement.getTimes();
+ final Object[] columns = statement.getColumns();
+ for (int i = 1; i < statement.getRowCount(); i++) {
+ long time = timestamps[i];
+ Assert.assertTrue(time > timestamps[i - 1]);
+ Assert.assertEquals(
+ ((Binary[]) columns[0])[i],
+ new Binary(String.valueOf(i /
100).getBytes(StandardCharsets.UTF_8)));
+ Assert.assertEquals(((long[]) columns[1])[i], (long) i);
+ Assert.assertEquals(((float[]) columns[2])[i], i * 1.0f, 0.001f);
+ Assert.assertEquals(
+ ((Binary[]) columns[3])[i],
+ new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
+ Assert.assertEquals(((long[]) columns[4])[i], (long) i);
+ Assert.assertEquals(((int[]) columns[5])[i], i);
+ Assert.assertEquals(((double[]) columns[6])[i], i * 0.1, 0.0001);
+ // DATE is stored as int[] in Statement, not LocalDate[]
+ LocalDate expectedDate = PipeTabletEventSorterTest.getDate(i);
+ int expectedDateInt =
+
org.apache.tsfile.utils.DateUtils.parseDateExpressionToInt(expectedDate);
+ Assert.assertEquals(((int[]) columns[7])[i], expectedDateInt);
+ Assert.assertEquals(
+ ((Binary[]) columns[8])[i],
+ new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java
new file mode 100644
index 00000000000..410afc76130
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under this License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TabletStatementConverterTest {
+
+ @Test
+ public void testConvertStatementToTabletTreeModel() throws MetadataException
{
+ final int columnCount = 1000;
+ final int rowCount = 100;
+ final String deviceName = "root.sg.device";
+ final boolean isAligned = true;
+
+ // Generate Tablet and construct Statement from it
+ final Tablet originalTablet = generateTreeModelTablet(deviceName,
columnCount, rowCount);
+ final InsertTabletStatement statement =
+ new InsertTabletStatement(originalTablet, isAligned, null);
+
+ // Convert Statement to Tablet
+ final Tablet convertedTablet = statement.convertToTablet();
+
+ // Verify conversion
+ assertTabletsEqual(originalTablet, convertedTablet);
+ }
+
+ @Test
+ public void testConvertStatementToTabletTableModel() throws
MetadataException {
+ final int columnCount = 1000;
+ final int rowCount = 100;
+ final String tableName = "table1";
+ final String databaseName = "test_db";
+ final boolean isAligned = false;
+
+ // Generate Tablet and construct Statement from it
+ final Tablet originalTablet = generateTableModelTablet(tableName,
columnCount, rowCount);
+ final InsertTabletStatement statement =
+ new InsertTabletStatement(originalTablet, isAligned, databaseName);
+
+ // Convert Statement to Tablet
+ final Tablet convertedTablet = statement.convertToTablet();
+
+ // Verify conversion
+ assertTabletsEqual(originalTablet, convertedTablet);
+ }
+
+ @Test
+ public void testDeserializeStatementFromTabletFormat() throws IOException,
MetadataException {
+ final int columnCount = 1000;
+ final int rowCount = 100;
+ final String deviceName = "root.sg.device";
+
+ // Generate test Tablet
+ final Tablet originalTablet = generateTreeModelTablet(deviceName,
columnCount, rowCount);
+
+ // Serialize Tablet to ByteBuffer
+ final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+ // Then serialize the tablet
+ originalTablet.serialize(outputStream);
+ // Write isAligned at the end
+ final boolean isAligned = true;
+ ReadWriteIOUtils.write(isAligned, outputStream);
+
+ final ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+
+ // Deserialize Statement from Tablet format
+ final InsertTabletStatement statement =
+ TabletStatementConverter.deserializeStatementFromTabletFormat(buffer);
+
+ // Verify basic information
+ Assert.assertEquals(deviceName, statement.getDevicePath().getFullPath());
+ Assert.assertEquals(rowCount, statement.getRowCount());
+ Assert.assertEquals(columnCount, statement.getMeasurements().length);
+ Assert.assertEquals(isAligned, statement.isAligned());
+
+ // Verify data by converting Statement back to Tablet
+ final Tablet convertedTablet = statement.convertToTablet();
+ assertTabletsEqual(originalTablet, convertedTablet);
+ }
+
+ @Test
+ public void testRoundTripConversionTreeModel() throws MetadataException,
IOException {
+ final int columnCount = 1000;
+ final int rowCount = 100;
+ final String deviceName = "root.sg.device";
+
+ // Generate original Tablet
+ final Tablet originalTablet = generateTreeModelTablet(deviceName,
columnCount, rowCount);
+
+ // Serialize Tablet to ByteBuffer
+ final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+ originalTablet.serialize(outputStream);
+ final boolean isAligned = true;
+ ReadWriteIOUtils.write(isAligned, outputStream);
+ final ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+
+ // Deserialize to Statement
+ final InsertTabletStatement statement =
+ TabletStatementConverter.deserializeStatementFromTabletFormat(buffer);
+ // Convert Statement back to Tablet
+ final Tablet convertedTablet = statement.convertToTablet();
+
+ // Verify round trip
+ assertTabletsEqual(originalTablet, convertedTablet);
+ }
+
+ @Test
+ public void testRoundTripConversionTableModel() throws MetadataException {
+ final int columnCount = 1000;
+ final int rowCount = 100;
+ final String tableName = "table1";
+ final String databaseName = "test_db";
+ final boolean isAligned = false;
+
+ // Generate original Tablet for table model
+ final Tablet originalTablet = generateTableModelTablet(tableName,
columnCount, rowCount);
+
+ // Construct Statement from Tablet
+ final InsertTabletStatement originalStatement =
+ new InsertTabletStatement(originalTablet, isAligned, databaseName);
+
+ // Convert Statement to Tablet
+ final Tablet convertedTablet = originalStatement.convertToTablet();
+
+ // Convert Tablet back to Statement
+ final InsertTabletStatement convertedStatement =
+ new InsertTabletStatement(convertedTablet, isAligned, databaseName);
+
+ // Verify round trip: original Tablet should equal converted Tablet
+ assertTabletsEqual(originalTablet, convertedTablet);
+ }
+
+ /**
+ * Generate a Tablet for tree model with all data types and specified number
of columns and rows.
+ *
+ * @param deviceName device name
+ * @param columnCount number of columns
+ * @param rowCount number of rows
+ * @return Tablet with test data
+ */
+ private Tablet generateTreeModelTablet(
+ final String deviceName, final int columnCount, final int rowCount) {
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ final TSDataType[] dataTypes = new TSDataType[columnCount];
+ final String[] measurementNames = new String[columnCount];
+ final Object[] columnData = new Object[columnCount];
+
+ // Create schemas and generate test data
+ for (int col = 0; col < columnCount; col++) {
+ final TSDataType dataType = ALL_DATA_TYPES[col % ALL_DATA_TYPES.length];
+ final String measurementName = "col_" + col + "_" + dataType.name();
+ schemaList.add(new MeasurementSchema(measurementName, dataType));
+ dataTypes[col] = dataType;
+ measurementNames[col] = measurementName;
+
+ // Generate test data for this column
+ switch (dataType) {
+ case BOOLEAN:
+ final boolean[] boolValues = new boolean[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ boolValues[row] = (row + col) % 2 == 0;
+ }
+ columnData[col] = boolValues;
+ break;
+ case INT32:
+ final int[] intValues = new int[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ intValues[row] = row * 100 + col;
+ }
+ columnData[col] = intValues;
+ break;
+ case DATE:
+ final LocalDate[] dateValues = new LocalDate[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ // Generate valid dates starting from 2024-01-01
+ dateValues[row] = LocalDate.of(2024, 1, 1).plusDays((row + col) %
365);
+ }
+ columnData[col] = dateValues;
+ break;
+ case INT64:
+ case TIMESTAMP:
+ final long[] longValues = new long[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ longValues[row] = (long) row * 1000L + col;
+ }
+ columnData[col] = longValues;
+ break;
+ case FLOAT:
+ final float[] floatValues = new float[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ floatValues[row] = row * 1.5f + col * 0.1f;
+ }
+ columnData[col] = floatValues;
+ break;
+ case DOUBLE:
+ final double[] doubleValues = new double[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ doubleValues[row] = row * 2.5 + col * 0.01;
+ }
+ columnData[col] = doubleValues;
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ final Binary[] binaryValues = new Binary[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ binaryValues[row] = new Binary(("value_row_" + row + "_col_" +
col).getBytes());
+ }
+ columnData[col] = binaryValues;
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " +
dataType);
+ }
+ }
+
+ // Create and fill tablet
+ final Tablet tablet = new Tablet(deviceName, schemaList, rowCount);
+ final long[] times = new long[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ times[row] = row * 1000L;
+ final int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, times[row]);
+ for (int col = 0; col < columnCount; col++) {
+ final TSDataType dataType = dataTypes[col];
+ final Object data = columnData[col];
+ switch (dataType) {
+ case BOOLEAN:
+ tablet.addValue(measurementNames[col], rowIndex, ((boolean[])
data)[row]);
+ break;
+ case INT32:
+ tablet.addValue(measurementNames[col], rowIndex, ((int[])
data)[row]);
+ break;
+ case DATE:
+ tablet.addValue(measurementNames[col], rowIndex, ((LocalDate[])
data)[row]);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ tablet.addValue(measurementNames[col], rowIndex, ((long[])
data)[row]);
+ break;
+ case FLOAT:
+ tablet.addValue(measurementNames[col], rowIndex, ((float[])
data)[row]);
+ break;
+ case DOUBLE:
+ tablet.addValue(measurementNames[col], rowIndex, ((double[])
data)[row]);
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ tablet.addValue(measurementNames[col], rowIndex, ((Binary[])
data)[row]);
+ break;
+ }
+ }
+ }
+
+ return tablet;
+ }
+
+ /**
+ * Generate a Tablet for table model with all data types and specified
number of columns and rows.
+ *
+ * @param tableName table name
+ * @param columnCount number of columns
+ * @param rowCount number of rows
+ * @return Tablet with test data
+ */
+ private Tablet generateTableModelTablet(
+ final String tableName, final int columnCount, final int rowCount) {
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ final TSDataType[] dataTypes = new TSDataType[columnCount];
+ final String[] measurementNames = new String[columnCount];
+ final List<ColumnCategory> columnTypes = new ArrayList<>();
+ final Object[] columnData = new Object[columnCount];
+
+ // Create schemas and generate test data
+ for (int col = 0; col < columnCount; col++) {
+ final TSDataType dataType = ALL_DATA_TYPES[col % ALL_DATA_TYPES.length];
+ final String measurementName = "col_" + col + "_" + dataType.name();
+ schemaList.add(new MeasurementSchema(measurementName, dataType));
+ dataTypes[col] = dataType;
+ measurementNames[col] = measurementName;
+ // For table model, all columns are FIELD (can be TAG/ATTRIBUTE/FIELD,
but we use FIELD for
+ // simplicity)
+ columnTypes.add(ColumnCategory.FIELD);
+
+ // Generate test data for this column
+ switch (dataType) {
+ case BOOLEAN:
+ final boolean[] boolValues = new boolean[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ boolValues[row] = (row + col) % 2 == 0;
+ }
+ columnData[col] = boolValues;
+ break;
+ case INT32:
+ final int[] intValues = new int[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ intValues[row] = row * 100 + col;
+ }
+ columnData[col] = intValues;
+ break;
+ case DATE:
+ final LocalDate[] dateValues = new LocalDate[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ // Generate valid dates starting from 2024-01-01
+ dateValues[row] = LocalDate.of(2024, 1, 1).plusDays((row + col) %
365);
+ }
+ columnData[col] = dateValues;
+ break;
+ case INT64:
+ case TIMESTAMP:
+ final long[] longValues = new long[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ longValues[row] = (long) row * 1000L + col;
+ }
+ columnData[col] = longValues;
+ break;
+ case FLOAT:
+ final float[] floatValues = new float[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ floatValues[row] = row * 1.5f + col * 0.1f;
+ }
+ columnData[col] = floatValues;
+ break;
+ case DOUBLE:
+ final double[] doubleValues = new double[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ doubleValues[row] = row * 2.5 + col * 0.01;
+ }
+ columnData[col] = doubleValues;
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ final Binary[] binaryValues = new Binary[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ binaryValues[row] = new Binary(("value_row_" + row + "_col_" +
col).getBytes());
+ }
+ columnData[col] = binaryValues;
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " +
dataType);
+ }
+ }
+
+ // Create tablet using table model constructor: Tablet(String,
List<String>, List<TSDataType>,
+ // List<ColumnCategory>, int)
+ final List<String> measurementNameList =
IMeasurementSchema.getMeasurementNameList(schemaList);
+ final List<TSDataType> dataTypeList =
IMeasurementSchema.getDataTypeList(schemaList);
+ final Tablet tablet =
+ new Tablet(tableName, measurementNameList, dataTypeList, columnTypes,
rowCount);
+ tablet.initBitMaps();
+
+ // Fill tablet with data
+ final long[] times = new long[rowCount];
+ for (int row = 0; row < rowCount; row++) {
+ times[row] = row * 1000L;
+ final int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, times[row]);
+ for (int col = 0; col < columnCount; col++) {
+ final TSDataType dataType = dataTypes[col];
+ final Object data = columnData[col];
+ switch (dataType) {
+ case BOOLEAN:
+ tablet.addValue(measurementNames[col], rowIndex, ((boolean[])
data)[row]);
+ break;
+ case INT32:
+ tablet.addValue(measurementNames[col], rowIndex, ((int[])
data)[row]);
+ break;
+ case DATE:
+ tablet.addValue(measurementNames[col], rowIndex, ((LocalDate[])
data)[row]);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ tablet.addValue(measurementNames[col], rowIndex, ((long[])
data)[row]);
+ break;
+ case FLOAT:
+ tablet.addValue(measurementNames[col], rowIndex, ((float[])
data)[row]);
+ break;
+ case DOUBLE:
+ tablet.addValue(measurementNames[col], rowIndex, ((double[])
data)[row]);
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ tablet.addValue(measurementNames[col], rowIndex, ((Binary[])
data)[row]);
+ break;
+ }
+ }
+ tablet.setRowSize(rowIndex + 1);
+ }
+
+ return tablet;
+ }
+
+ /**
+ * Check if two Tablets are equal in all aspects.
+ *
+ * @param expected expected Tablet
+ * @param actual actual Tablet
+ */
+ private void assertTabletsEqual(final Tablet expected, final Tablet actual) {
+ Assert.assertEquals(expected.getDeviceId(), actual.getDeviceId());
+ Assert.assertEquals(expected.getRowSize(), actual.getRowSize());
+ Assert.assertEquals(expected.getSchemas().size(),
actual.getSchemas().size());
+
+ // Verify timestamps
+ final long[] expectedTimes = expected.getTimestamps();
+ final long[] actualTimes = actual.getTimestamps();
+ Assert.assertArrayEquals(expectedTimes, actualTimes);
+
+ // Verify each column
+ final int columnCount = expected.getSchemas().size();
+ final int rowCount = expected.getRowSize();
+ final Object[] expectedValues = expected.getValues();
+ final Object[] actualValues = actual.getValues();
+
+ for (int col = 0; col < columnCount; col++) {
+ final IMeasurementSchema schema = expected.getSchemas().get(col);
+ final TSDataType dataType = schema.getType();
+ final Object expectedColumn = expectedValues[col];
+ final Object actualColumn = actualValues[col];
+
+ Assert.assertNotNull(actualColumn);
+
+ // Verify each row in this column
+ for (int row = 0; row < rowCount; row++) {
+ switch (dataType) {
+ case BOOLEAN:
+ final boolean expectedBool = ((boolean[]) expectedColumn)[row];
+ final boolean actualBool = ((boolean[]) actualColumn)[row];
+ Assert.assertEquals(expectedBool, actualBool);
+ break;
+ case INT32:
+ final int expectedInt = ((int[]) expectedColumn)[row];
+ final int actualInt = ((int[]) actualColumn)[row];
+ Assert.assertEquals(expectedInt, actualInt);
+ break;
+ case DATE:
+ final LocalDate expectedDate = ((LocalDate[]) expectedColumn)[row];
+ final LocalDate actualDate = ((LocalDate[]) actualColumn)[row];
+ Assert.assertEquals(expectedDate, actualDate);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ final long expectedLong = ((long[]) expectedColumn)[row];
+ final long actualLong = ((long[]) actualColumn)[row];
+ Assert.assertEquals(expectedLong, actualLong);
+ break;
+ case FLOAT:
+ final float expectedFloat = ((float[]) expectedColumn)[row];
+ final float actualFloat = ((float[]) actualColumn)[row];
+ Assert.assertEquals(expectedFloat, actualFloat, 0.0001f);
+ break;
+ case DOUBLE:
+ final double expectedDouble = ((double[]) expectedColumn)[row];
+ final double actualDouble = ((double[]) actualColumn)[row];
+ Assert.assertEquals(expectedDouble, actualDouble, 0.0001);
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ final Binary expectedBinary = ((Binary[]) expectedColumn)[row];
+ final Binary actualBinary = ((Binary[]) actualColumn)[row];
+ Assert.assertNotNull(actualBinary);
+ Assert.assertEquals(expectedBinary, actualBinary);
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if a Tablet and an InsertTabletStatement contain the same data.
+ *
+ * @param tablet Tablet
+ * @param statement InsertTabletStatement
+ */
+ private void assertTabletAndStatementEqual(
+ final Tablet tablet, final InsertTabletStatement statement) {
+ Assert.assertEquals(
+ tablet.getDeviceId(),
+ statement.getDevicePath() != null ?
statement.getDevicePath().getFullPath() : null);
+ Assert.assertEquals(tablet.getRowSize(), statement.getRowCount());
+ Assert.assertEquals(tablet.getSchemas().size(),
statement.getMeasurements().length);
+
+ // Verify timestamps
+ Assert.assertArrayEquals(tablet.getTimestamps(), statement.getTimes());
+
+ // Verify each column
+ final int columnCount = tablet.getSchemas().size();
+ final int rowCount = tablet.getRowSize();
+ final Object[] tabletValues = tablet.getValues();
+ final Object[] statementColumns = statement.getColumns();
+
+ for (int col = 0; col < columnCount; col++) {
+ final TSDataType dataType = tablet.getSchemas().get(col).getType();
+ final Object tabletColumn = tabletValues[col];
+ final Object statementColumn = statementColumns[col];
+
+ Assert.assertNotNull(statementColumn);
+
+ // Verify each row
+ for (int row = 0; row < rowCount; row++) {
+ switch (dataType) {
+ case BOOLEAN:
+ final boolean tabletBool = ((boolean[]) tabletColumn)[row];
+ final boolean statementBool = ((boolean[]) statementColumn)[row];
+ Assert.assertEquals(tabletBool, statementBool);
+ break;
+ case INT32:
+ final int tabletInt = ((int[]) tabletColumn)[row];
+ final int statementInt = ((int[]) statementColumn)[row];
+ Assert.assertEquals(tabletInt, statementInt);
+ break;
+ case DATE:
+ // DATE type: Tablet uses LocalDate[], Statement uses int[]
(YYYYMMDD format)
+ final LocalDate tabletDate = ((LocalDate[]) tabletColumn)[row];
+ final int statementDateInt = ((int[]) statementColumn)[row];
+ // Convert LocalDate to int (YYYYMMDD format) for comparison
+ final int tabletDateInt =
DateUtils.parseDateExpressionToInt(tabletDate);
+ Assert.assertEquals(tabletDateInt, statementDateInt);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ final long tabletLong = ((long[]) tabletColumn)[row];
+ final long statementLong = ((long[]) statementColumn)[row];
+ Assert.assertEquals(tabletLong, statementLong);
+ break;
+ case FLOAT:
+ final float tabletFloat = ((float[]) tabletColumn)[row];
+ final float statementFloat = ((float[]) statementColumn)[row];
+ Assert.assertEquals(tabletFloat, statementFloat, 0.0001f);
+ break;
+ case DOUBLE:
+ final double tabletDouble = ((double[]) tabletColumn)[row];
+ final double statementDouble = ((double[]) statementColumn)[row];
+ Assert.assertEquals(tabletDouble, statementDouble, 0.0001);
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ final Binary tabletBinary = ((Binary[]) tabletColumn)[row];
+ final Binary statementBinary = ((Binary[]) statementColumn)[row];
+ Assert.assertNotNull(statementBinary);
+ Assert.assertEquals(tabletBinary, statementBinary);
+ break;
+ }
+ }
+ }
+ }
+
+ // Define all supported data types
+ private static final TSDataType[] ALL_DATA_TYPES = {
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ TSDataType.TIMESTAMP,
+ TSDataType.DATE,
+ TSDataType.BLOB,
+ TSDataType.STRING
+ };
+}