This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5c4c947eb64 Pipe: Removed the useless ser-de in receiver raw req &
Improved the handling logic for rowCount and null value bitmaps in
insertTabletNode (#16133)
5c4c947eb64 is described below
commit 5c4c947eb64b966e56dd3bd6be6c684a02a928da
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 12 16:44:53 2025 +0800
Pipe: Removed the useless ser-de in receiver raw req & Improved the
handling logic for rowCount and null value bitmaps in insertTabletNode (#16133)
* req
* test
* fix
* test
* partial
* bug-fix
* completion
---
.../statement/PipeConvertedInsertRowStatement.java | 2 +-
.../PipeConvertedInsertTabletStatement.java | 2 +-
.../request/PipeTransferTabletRawReq.java | 22 +---------
.../request/PipeTransferTabletRawReqV2.java | 36 +---------------
.../plan/planner/plan/node/write/InsertNode.java | 9 ++--
.../planner/plan/node/write/InsertTabletNode.java | 32 +++++++-------
.../plan/node/write/RelationalInsertRowNode.java | 6 +--
.../plan/node/write/RelationalInsertRowsNode.java | 6 +--
.../node/write/RelationalInsertTabletNode.java | 12 +++---
.../plan/statement/crud/InsertBaseStatement.java | 22 +++++-----
.../plan/statement/crud/InsertRowStatement.java | 6 +--
.../plan/statement/crud/InsertTabletStatement.java | 50 ++++++++++++++++++++--
12 files changed, 96 insertions(+), 109 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
index edf8b636ecf..2484fd18de8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -53,7 +53,7 @@ public class PipeConvertedInsertRowStatement extends
InsertRowStatement {
measurements = insertRowStatement.getMeasurements();
dataTypes = insertRowStatement.getDataTypes();
columnCategories = insertRowStatement.getColumnCategories();
- idColumnIndices = insertRowStatement.getIdColumnIndices();
+ tagColumnIndices = insertRowStatement.getTagColumnIndices();
attrColumnIndices = insertRowStatement.getAttrColumnIndices();
writeToTable = insertRowStatement.isWriteToTable();
databaseName = insertRowStatement.getDatabaseName().orElse(null);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
index eee47a5b0b1..cf03c7c0c94 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -46,7 +46,7 @@ public class PipeConvertedInsertTabletStatement extends
InsertTabletStatement {
devicePath = insertTabletStatement.getDevicePath();
isAligned = insertTabletStatement.isAligned();
columnCategories = insertTabletStatement.getColumnCategories();
- idColumnIndices = insertTabletStatement.getIdColumnIndices();
+ tagColumnIndices = insertTabletStatement.getTagColumnIndices();
attrColumnIndices = insertTabletStatement.getAttrColumnIndices();
writeToTable = insertTabletStatement.isWriteToTable();
databaseName = insertTabletStatement.getDatabaseName().orElse(null);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 4e3888f8d0e..7da44f297d1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -22,18 +22,13 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
-import org.apache.iotdb.commons.utils.PathUtils;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
-import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.session.util.SessionUtils;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,22 +63,7 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
return new InsertTabletStatement();
}
- final TSInsertTabletReq request = new TSInsertTabletReq();
-
- for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
- request.addToMeasurements(measurementSchema.getMeasurementName());
- request.addToTypes(measurementSchema.getType().ordinal());
- }
-
- request.setPrefixPath(tablet.getDeviceId());
- request.setIsAligned(isAligned);
- request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
- request.setValues(SessionUtils.getValueBuffer(tablet));
- request.setSize(tablet.getRowSize());
- request.setMeasurements(
-
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
-
- return StatementGenerator.createStatement(request);
+ return new InsertTabletStatement(tablet, isAligned, null);
} catch (final MetadataException e) {
LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
index ee0e2fb220a..43d8501252c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
@@ -22,19 +22,14 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
-import org.apache.iotdb.commons.utils.PathUtils;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
-import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.session.util.SessionUtils;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +37,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-import java.util.stream.Collectors;
import static
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent.isTabletEmpty;
@@ -70,35 +64,7 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
return new InsertTabletStatement();
}
- final TSInsertTabletReq request = new TSInsertTabletReq();
-
- for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
- request.addToMeasurements(measurementSchema.getMeasurementName());
- request.addToTypes(measurementSchema.getType().ordinal());
- }
-
- request.setPrefixPath(tablet.getDeviceId());
- request.setIsAligned(isAligned);
- request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
- request.setValues(SessionUtils.getValueBuffer(tablet));
- request.setSize(tablet.getRowSize());
-
- // Tree model
- if (Objects.isNull(dataBaseName)) {
- request.setMeasurements(
-
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
- return StatementGenerator.createStatement(request);
- }
-
- // Table model
- request.setWriteToTable(true);
- request.columnCategories =
- tablet.getColumnTypes().stream()
- .map(t -> (byte) t.ordinal())
- .collect(Collectors.toList());
- final InsertTabletStatement statement =
StatementGenerator.createStatement(request);
- statement.setDatabaseName(dataBaseName);
- return statement;
+ return new InsertTabletStatement(tablet, isAligned, dataBaseName);
} catch (final MetadataException e) {
LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 102119269bb..f913bbd7a97 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -53,7 +53,6 @@ import java.util.Objects;
import java.util.stream.Collectors;
public abstract class InsertNode extends SearchNode {
-
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
/**
@@ -69,7 +68,7 @@ public abstract class InsertNode extends SearchNode {
protected TSDataType[] dataTypes;
protected TsTableColumnCategory[] columnCategories;
- protected List<Integer> idColumnIndices;
+ protected List<Integer> tagColumnIndices;
protected int measurementColumnCnt = -1;
protected int failedMeasurementNumber = 0;
@@ -339,7 +338,7 @@ public abstract class InsertNode extends SearchNode {
public boolean allMeasurementFailed() {
if (measurements != null) {
return failedMeasurementNumber
- >= measurements.length - (idColumnIndices == null ? 0 :
idColumnIndices.size());
+ >= measurements.length - (tagColumnIndices == null ? 0 :
tagColumnIndices.size());
}
return true;
}
@@ -398,10 +397,10 @@ public abstract class InsertNode extends SearchNode {
public void setColumnCategories(TsTableColumnCategory[] columnCategories) {
this.columnCategories = columnCategories;
if (columnCategories != null) {
- idColumnIndices = new ArrayList<>();
+ tagColumnIndices = new ArrayList<>();
for (int i = 0; i < columnCategories.length; i++) {
if (columnCategories[i].equals(TsTableColumnCategory.TAG)) {
- idColumnIndices.add(i);
+ tagColumnIndices.add(i);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index c8b840cc6bb..be0b00931fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -74,7 +74,6 @@ import java.util.Objects;
import static org.apache.iotdb.db.utils.CommonUtils.isAlive;
public class InsertTabletNode extends InsertNode implements WALEntryValue {
-
private static final String DATATYPE_UNSUPPORTED = "Data type %s is not
supported.";
protected long[] times; // times should be sorted. It is done in the session
API.
@@ -528,17 +527,17 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
}
- private void writeTimes(ByteBuffer buffer) {
+ private void writeTimes(final ByteBuffer buffer) {
ReadWriteIOUtils.write(rowCount, buffer);
- for (long time : times) {
- ReadWriteIOUtils.write(time, buffer);
+ for (int i = 0; i < rowCount; ++i) {
+ ReadWriteIOUtils.write(times[i], buffer);
}
}
- private void writeTimes(DataOutputStream stream) throws IOException {
+ private void writeTimes(final DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(rowCount, stream);
- for (long time : times) {
- ReadWriteIOUtils.write(time, stream);
+ for (int i = 0; i < rowCount; ++i) {
+ ReadWriteIOUtils.write(times[i], stream);
}
}
@@ -556,7 +555,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
ReadWriteIOUtils.write(BytesUtils.boolToByte(false), buffer);
} else {
ReadWriteIOUtils.write(BytesUtils.boolToByte(true), buffer);
- buffer.put(bitMaps[i].getByteArray());
+ buffer.put(bitMaps[i].getByteArray(), 0,
BitMap.getSizeOfBytes(rowCount));
}
}
}
@@ -576,7 +575,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream);
} else {
ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream);
- stream.write(bitMaps[i].getByteArray());
+ stream.write(bitMaps[i].getByteArray(), 0,
BitMap.getSizeOfBytes(rowCount));
}
}
}
@@ -643,7 +642,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
- if (binaryValues[j] != null) {
+ if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
ReadWriteIOUtils.write(binaryValues[j], buffer);
} else {
ReadWriteIOUtils.write(0, buffer);
@@ -695,7 +694,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
case BLOB:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
- if (binaryValues[j] != null) {
+ if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
ReadWriteIOUtils.write(binaryValues[j], stream);
} else {
ReadWriteIOUtils.write(0, stream);
@@ -751,6 +750,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
bitMaps =
QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize,
rowCount).orElse(null);
}
+
columns =
QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes,
measurementSize, rowCount);
isAligned = buffer.get() == 1;
@@ -966,7 +966,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
case BLOB:
Binary[] binaryValues = (Binary[]) column;
for (int j = start; j < end; j++) {
- if (binaryValues[j] != null) {
+ if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
buffer.putInt(binaryValues[j].getLength());
buffer.put(binaryValues[j].getValues());
} else {
@@ -1310,10 +1310,10 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
+ Arrays.toString(measurements)
+ ", rowCount="
+ rowCount
- + ", timeRange=[,"
- + times[0]
- + ", "
- + times[times.length - 1]
+ + ", timeRange=["
+ + (Objects.nonNull(times) && times.length > 0
+ ? times[0] + ", " + times[times.length - 1]
+ : "")
+ "]"
+ '}';
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
index d4240970386..a824448dc78 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -87,10 +87,10 @@ public class RelationalInsertRowNode extends InsertRowNode {
@Override
public IDeviceID getDeviceID() {
if (deviceID == null) {
- String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+ String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
deviceIdSegments[0] = this.getTableName();
- for (int i = 0; i < idColumnIndices.size(); i++) {
- final Integer columnIndex = idColumnIndices.get(i);
+ for (int i = 0; i < tagColumnIndices.size(); i++) {
+ final Integer columnIndex = tagColumnIndices.get(i);
deviceIdSegments[i + 1] =
getValues()[columnIndex] != null ?
getValues()[columnIndex].toString() : null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 71faebec3ca..77020d9220d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -59,10 +59,10 @@ public class RelationalInsertRowsNode extends
InsertRowsNode {
deviceIDs = new IDeviceID[getInsertRowNodeList().size()];
}
if (deviceIDs[rowIdx] == null) {
- String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+ String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
deviceIdSegments[0] = this.getTableName();
- for (int i = 0; i < idColumnIndices.size(); i++) {
- final Integer columnIndex = idColumnIndices.get(i);
+ for (int i = 0; i < tagColumnIndices.size(); i++) {
+ final Integer columnIndex = tagColumnIndices.get(i);
deviceIdSegments[i + 1] =
((Object[])
getInsertRowNodeList().get(i).getValues()[columnIndex])[rowIdx].toString();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 0d4b698108e..6e5e8eeb598 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -115,10 +115,10 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
deviceIDs = new IDeviceID[1];
}
if (deviceIDs[0] == null) {
- String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+ String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
deviceIdSegments[0] = this.getTableName();
- for (int i = 0; i < idColumnIndices.size(); i++) {
- final Integer columnIndex = idColumnIndices.get(i);
+ for (int i = 0; i < tagColumnIndices.size(); i++) {
+ final Integer columnIndex = tagColumnIndices.get(i);
Object idSeg = ((Object[]) columns[columnIndex])[0];
boolean isNull =
bitMaps != null && bitMaps[columnIndex] != null &&
bitMaps[columnIndex].isMarked(0);
@@ -132,10 +132,10 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
deviceIDs = new IDeviceID[rowCount];
}
if (deviceIDs[rowIdx] == null) {
- String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+ String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
deviceIdSegments[0] = this.getTableName();
- for (int i = 0; i < idColumnIndices.size(); i++) {
- final Integer columnIndex = idColumnIndices.get(i);
+ for (int i = 0; i < tagColumnIndices.size(); i++) {
+ final Integer columnIndex = tagColumnIndices.get(i);
Object idSeg = ((Object[]) columns[columnIndex])[rowIdx];
boolean isNull =
bitMaps != null
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 28efbd84a81..950195c5248 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -87,7 +87,7 @@ public abstract class InsertBaseStatement extends Statement
implements Accountab
protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
protected TsTableColumnCategory[] columnCategories;
- protected List<Integer> idColumnIndices;
+ protected List<Integer> tagColumnIndices;
protected List<Integer> attrColumnIndices;
protected boolean writeToTable = false;
@@ -319,19 +319,19 @@ public abstract class InsertBaseStatement extends
Statement implements Accountab
columnCategories = new TsTableColumnCategory[measurements.length];
}
this.columnCategories[i] = columnCategory;
- this.idColumnIndices = null;
+ this.tagColumnIndices = null;
}
- public List<Integer> getIdColumnIndices() {
- if (idColumnIndices == null && columnCategories != null) {
- idColumnIndices = new ArrayList<>();
+ public List<Integer> getTagColumnIndices() {
+ if (tagColumnIndices == null && columnCategories != null) {
+ tagColumnIndices = new ArrayList<>();
for (int i = 0; i < columnCategories.length; i++) {
if (columnCategories[i].equals(TsTableColumnCategory.TAG)) {
- idColumnIndices.add(i);
+ tagColumnIndices.add(i);
}
}
}
- return idColumnIndices;
+ return tagColumnIndices;
}
public List<Integer> getAttrColumnIndices() {
@@ -432,7 +432,7 @@ public abstract class InsertBaseStatement extends Statement
implements Accountab
subRemoveAttributeColumns(columnsToKeep);
// to reconstruct indices
- idColumnIndices = null;
+ tagColumnIndices = null;
attrColumnIndices = null;
}
@@ -601,7 +601,7 @@ public abstract class InsertBaseStatement extends Statement
implements Accountab
System.arraycopy(
columnCategories, pos, tmpCategories, pos + 1,
columnCategories.length - pos);
columnCategories = tmpCategories;
- idColumnIndices = null;
+ tagColumnIndices = null;
}
}
@@ -626,7 +626,7 @@ public abstract class InsertBaseStatement extends Statement
implements Accountab
if (inputLocations != null) {
CommonUtils.swapArray(inputLocations, src, target);
}
- idColumnIndices = null;
+ tagColumnIndices = null;
}
public boolean isWriteToTable() {
@@ -707,7 +707,7 @@ public abstract class InsertBaseStatement extends Statement
implements Accountab
+ RamUsageEstimator.shallowSizeOf(dataTypes)
+ RamUsageEstimator.shallowSizeOf(columnCategories)
// We assume that the integers are all cached by JVM
- + shallowSizeOfList(idColumnIndices)
+ + shallowSizeOfList(tagColumnIndices)
+ shallowSizeOfList(attrColumnIndices)
+ shallowSizeOfList(logicalViewSchemaList)
+ (Objects.nonNull(logicalViewSchemaList)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 1f789ee0051..52901f6f7fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -493,10 +493,10 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
@TableModel
public IDeviceID getTableDeviceID() {
if (deviceID == null) {
- String[] deviceIdSegments = new String[getIdColumnIndices().size() + 1];
+ String[] deviceIdSegments = new String[getTagColumnIndices().size() + 1];
deviceIdSegments[0] = this.getTableName();
- for (int i = 0; i < getIdColumnIndices().size(); i++) {
- final Integer columnIndex = getIdColumnIndices().get(i);
+ for (int i = 0; i < getTagColumnIndices().size(); i++) {
+ final Integer columnIndex = getTagColumnIndices().get(i);
deviceIdSegments[i + 1] =
values[columnIndex] != null ? values[columnIndex].toString() :
null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 5aa327f5dbb..e7eafc4df72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -20,7 +20,9 @@
package org.apache.iotdb.db.queryengine.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
@@ -29,6 +31,7 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
@@ -47,11 +50,15 @@ import
org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -61,7 +68,6 @@ import java.util.Map;
import java.util.Objects;
public class InsertTabletStatement extends InsertBaseStatement implements
ISchemaValidation {
-
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(InsertTabletStatement.class);
@@ -90,6 +96,42 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
this.recordedEndOfLogicalViewSchemaList = 0;
}
+ public InsertTabletStatement(
+ final Tablet tablet, final boolean isAligned, final String databaseName)
+ throws MetadataException {
+ this();
+ setMeasurements(
+ tablet.getSchemas().stream()
+ .map(IMeasurementSchema::getMeasurementName)
+ .toArray(String[]::new));
+ setDataTypes(
+
tablet.getSchemas().stream().map(IMeasurementSchema::getType).toArray(TSDataType[]::new));
+
setDevicePath(DataNodeDevicePathCache.getInstance().getPartialPath(tablet.getDeviceId()));
+ setAligned(isAligned);
+ setTimes(tablet.getTimestamps());
+
setColumns(Arrays.stream(tablet.getValues()).map(this::convertTableColumn).toArray());
+ setBitMaps(tablet.getBitMaps());
+ setRowCount(tablet.getRowSize());
+
+ if (Objects.nonNull(databaseName)) {
+ setWriteToTable(true);
+ setDatabaseName(databaseName);
+ setColumnCategories(
+ tablet.getColumnTypes().stream()
+ .map(TsTableColumnCategory::fromTsFileColumnCategory)
+ .toArray(TsTableColumnCategory[]::new));
+ }
+ }
+
+ private Object convertTableColumn(final Object input) {
+ return input instanceof LocalDate[]
+ ? Arrays.stream(((LocalDate[]) input))
+ .map(date -> Objects.nonNull(date) ?
DateUtils.parseDateExpressionToInt(date) : 0)
+ .mapToInt(Integer::intValue)
+ .toArray()
+ : input;
+ }
+
public InsertTabletStatement(InsertTabletNode node) {
this();
setDevicePath(node.getTargetPath());
@@ -461,10 +503,10 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
deviceIDs = new IDeviceID[rowCount];
}
if (deviceIDs[rowIdx] == null) {
- String[] deviceIdSegments = new String[getIdColumnIndices().size() + 1];
+ String[] deviceIdSegments = new String[getTagColumnIndices().size() + 1];
deviceIdSegments[0] = this.getTableName();
- for (int i = 0; i < getIdColumnIndices().size(); i++) {
- final Integer columnIndex = getIdColumnIndices().get(i);
+ for (int i = 0; i < getTagColumnIndices().size(); i++) {
+ final Integer columnIndex = getTagColumnIndices().get(i);
boolean isNull = isNull(rowIdx, i);
deviceIdSegments[i + 1] =
isNull ? null : ((Object[])
columns[columnIndex])[rowIdx].toString();