This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3f4c44ab3e8 Pipe: Fixed some unstable ITs (#17713)
3f4c44ab3e8 is described below
commit 3f4c44ab3e861a2d74ae16c20f5770ad8c75410e
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 19 11:46:27 2026 +0800
Pipe: Fixed some unstable ITs (#17713)
---
.../treemodel/manual/IoTDBPipePermissionIT.java | 5 +
.../evolvable/batch/PipeTabletEventPlainBatch.java | 19 +--
.../request/PipeTransferTabletBatchReqV2.java | 82 +++++++----
.../request/PipeTransferTabletBinaryReqV2.java | 11 +-
.../request/PipeTransferTabletInsertNodeReqV2.java | 11 +-
.../request/PipeTransferTabletRawReqV2.java | 33 +++--
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 4 +-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 4 +-
.../thrift/sync/IoTDBDataRegionSyncSink.java | 4 +-
.../sink/protocol/writeback/WriteBackSink.java | 6 +-
.../pipe/sink/util/TabletStatementConverter.java | 21 ++-
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 152 +++++++++++++++++++++
12 files changed, 288 insertions(+), 64 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index cf6eaf6a9c5..d43bb6564b1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -590,12 +590,16 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
"count(root.vehicle.plane.pressure),",
Collections.singleton("1,"));
+ // After restart, the pipe keeps retrying with the stale password and may
trigger login lock.
+ statement.execute("alter user thulab account unlock");
+
try {
statement.execute("alter pipe a2b modify source ('password'='fake')");
} catch (final SQLException e) {
Assert.assertEquals("801: Failed to check password for pipe a2b.",
e.getMessage());
}
+ statement.execute("alter user thulab account unlock");
statement.execute("alter pipe a2b modify source
('password'='newST@ongPassword')");
// Test empty alter
@@ -620,6 +624,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
statement = connection.createStatement();
TestUtils.executeNonQuery(
senderEnv, "insert into root.vehicle.plane(temperature, pressure)
values (36.5, 1103)");
+ statement.execute("alter user thulab account unlock");
statement.execute("alter user thulab set password 'newST@ongPassword'");
statement.execute("alter pipe a2b");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 065ad3be840..34837424b98 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -54,7 +54,6 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
- private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null;
private final List<String> insertNodeDataBases = new ArrayList<>();
private final List<String> tabletDataBases = new ArrayList<>();
@@ -160,13 +159,14 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
buffer = insertNode.serializeToByteBuffer();
insertNodeBuffers.add(buffer);
if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
- estimateSize =
- RamUsageEstimator.sizeOf(
-
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
-
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
+ final String databaseName =
+ pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName();
+ estimateSize = RamUsageEstimator.sizeOf(databaseName);
+ insertNodeDataBases.add(databaseName);
} else {
- estimateSize = 4;
- insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
+ final String databaseName =
pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
+ estimateSize = RamUsageEstimator.sizeOf(databaseName);
+ insertNodeDataBases.add(databaseName);
}
estimateSize += buffer.limit();
} else {
@@ -192,9 +192,10 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(),
outputStream);
buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
- estimateSize = 4 + buffer.limit();
+ final String databaseName =
pipeRawTabletInsertionEvent.getTreeModelDatabaseName();
+ estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit();
tabletBuffers.add(buffer);
- tabletDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
+ tabletDataBases.add(databaseName);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
index f626d496b55..80550b6350f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
@@ -38,7 +38,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -55,14 +55,12 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
public List<InsertBaseStatement> constructStatements() {
final List<InsertBaseStatement> statements = new ArrayList<>();
- final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
- final InsertMultiTabletsStatement insertMultiTabletsStatement =
- new InsertMultiTabletsStatement();
-
- final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
- final List<InsertTabletStatement> insertTabletStatementList = new
ArrayList<>();
final Map<String, List<InsertRowStatement>>
tableModelDatabaseInsertRowStatementMap =
- new HashMap<>();
+ new LinkedHashMap<>();
+ final Map<String, List<InsertRowStatement>>
treeModelDatabaseInsertRowStatementMap =
+ new LinkedHashMap<>();
+ final Map<String, List<InsertTabletStatement>>
treeModelDatabaseInsertTabletStatementMap =
+ new LinkedHashMap<>();
for (final PipeTransferTabletInsertNodeReqV2 insertNodeReq :
insertNodeReqs) {
final InsertBaseStatement statement = insertNodeReq.constructStatement();
@@ -77,9 +75,12 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
} else if (statement instanceof InsertTabletStatement) {
statements.add(statement);
} else if (statement instanceof InsertRowsStatement) {
- tableModelDatabaseInsertRowStatementMap
- .computeIfAbsent(statement.getDatabaseName().get(), k -> new
ArrayList<>())
- .addAll(((InsertRowsStatement)
statement).getInsertRowStatementList());
+ for (final InsertRowStatement insertRowStatement :
+ ((InsertRowsStatement) statement).getInsertRowStatementList()) {
+ tableModelDatabaseInsertRowStatementMap
+ .computeIfAbsent(insertRowStatement.getDatabaseName().get(), k
-> new ArrayList<>())
+ .add(insertRowStatement);
+ }
} else {
throw new UnsupportedOperationException(
String.format(
@@ -89,12 +90,21 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
continue;
}
if (statement instanceof InsertRowStatement) {
- insertRowStatementList.add((InsertRowStatement) statement);
+ treeModelDatabaseInsertRowStatementMap
+ .computeIfAbsent(statement.getDatabaseName().orElse(null), k ->
new ArrayList<>())
+ .add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
- insertTabletStatementList.add((InsertTabletStatement) statement);
+ treeModelDatabaseInsertTabletStatementMap
+ .computeIfAbsent(statement.getDatabaseName().orElse(null), k ->
new ArrayList<>())
+ .add((InsertTabletStatement) statement);
} else if (statement instanceof InsertRowsStatement) {
- insertRowStatementList.addAll(
- ((InsertRowsStatement) statement).getInsertRowStatementList());
+ for (final InsertRowStatement insertRowStatement :
+ ((InsertRowsStatement) statement).getInsertRowStatementList()) {
+ treeModelDatabaseInsertRowStatementMap
+ .computeIfAbsent(
+ insertRowStatement.getDatabaseName().orElse(null), k -> new
ArrayList<>())
+ .add(insertRowStatement);
+ }
} else {
throw new UnsupportedOperationException(
String.format(
@@ -112,17 +122,13 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
statements.add(statement);
continue;
}
- insertTabletStatementList.add(statement);
+ treeModelDatabaseInsertTabletStatementMap
+ .computeIfAbsent(statement.getDatabaseName().orElse(null), k -> new
ArrayList<>())
+ .add(statement);
}
- insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
-
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
- if (!insertRowsStatement.isEmpty()) {
- statements.add(insertRowsStatement);
- }
- if (!insertMultiTabletsStatement.isEmpty()) {
- statements.add(insertMultiTabletsStatement);
- }
+ addTreeModelInsertRowsStatements(statements,
treeModelDatabaseInsertRowStatementMap);
+ addTreeModelInsertTabletsStatements(statements,
treeModelDatabaseInsertTabletStatementMap);
for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
tableModelDatabaseInsertRowStatementMap.entrySet()) {
@@ -136,6 +142,34 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
return statements;
}
+ private void addTreeModelInsertRowsStatements(
+ final List<InsertBaseStatement> statements,
+ final Map<String, List<InsertRowStatement>>
databaseInsertRowStatementMap) {
+ for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
+ databaseInsertRowStatementMap.entrySet()) {
+ final InsertRowsStatement statement = new InsertRowsStatement();
+ statement.setInsertRowStatementList(insertRows.getValue());
+ if (insertRows.getKey() != null) {
+ statement.setDatabaseName(insertRows.getKey());
+ }
+ statements.add(statement);
+ }
+ }
+
+ private void addTreeModelInsertTabletsStatements(
+ final List<InsertBaseStatement> statements,
+ final Map<String, List<InsertTabletStatement>>
databaseInsertTabletStatementMap) {
+ for (final Map.Entry<String, List<InsertTabletStatement>> insertTablets :
+ databaseInsertTabletStatementMap.entrySet()) {
+ final InsertMultiTabletsStatement statement = new
InsertMultiTabletsStatement();
+ statement.setInsertTabletStatementList(insertTablets.getValue());
+ if (insertTablets.getKey() != null) {
+ statement.setDatabaseName(insertTablets.getKey());
+ }
+ statements.add(statement);
+ }
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletBatchReqV2 toTPipeTransferReq(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java
index 8166e6b07ef..75501fe0f22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBinaryReqV2.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -72,14 +73,18 @@ public class PipeTransferTabletBinaryReqV2 extends
PipeTransferTabletBinaryReq {
return statement;
}
- // Table model
- statement.setWriteToTable(true);
+ final boolean isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
+ if (isTableModel) {
+ statement.setWriteToTable(true);
+ }
if (statement instanceof InsertRowsStatement) {
List<InsertRowStatement> rowStatements =
((InsertRowsStatement) statement).getInsertRowStatementList();
if (rowStatements != null && !rowStatements.isEmpty()) {
for (InsertRowStatement insertRowStatement : rowStatements) {
- insertRowStatement.setWriteToTable(true);
+ if (isTableModel) {
+ insertRowStatement.setWriteToTable(true);
+ }
insertRowStatement.setDatabaseName(dataBaseName);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java
index 599e12af9ed..e39330b5b0e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletInsertNodeReqV2.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -72,14 +73,18 @@ public class PipeTransferTabletInsertNodeReqV2 extends
PipeTransferTabletInsertN
return statement;
}
- // Table model
- statement.setWriteToTable(true);
+ final boolean isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
+ if (isTableModel) {
+ statement.setWriteToTable(true);
+ }
if (statement instanceof InsertRowsStatement) {
List<InsertRowStatement> rowStatements =
((InsertRowsStatement) statement).getInsertRowStatementList();
if (rowStatements != null && !rowStatements.isEmpty()) {
for (InsertRowStatement insertRowStatement : rowStatements) {
- insertRowStatement.setWriteToTable(true);
+ if (isTableModel) {
+ insertRowStatement.setWriteToTable(true);
+ }
insertRowStatement.setDatabaseName(dataBaseName);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
index f6b910a8844..2458e5e243f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
@@ -54,29 +55,45 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
@Override
public InsertTabletStatement constructStatement() {
+ final boolean isTableModel =
+ Objects.nonNull(dataBaseName) &&
PathUtils.isTableModelDatabase(dataBaseName);
+
if (statement != null) {
- if (Objects.isNull(dataBaseName)) {
- new
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
- } else {
+ if (isTableModel) {
new
PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary();
+ statement.setWriteToTable(true);
+ } else {
+ new
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+ }
+ if (Objects.nonNull(dataBaseName)) {
+ statement.setDatabaseName(dataBaseName);
}
return statement;
}
- if (Objects.isNull(dataBaseName)) {
- new
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
- } else {
+ if (isTableModel) {
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
+ } else {
+ new
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
}
try {
if (isTabletEmpty(tablet)) {
// Empty statement, will be filtered after construction
- return new InsertTabletStatement();
+ statement = new InsertTabletStatement();
+ return statement;
}
- return new InsertTabletStatement(tablet, isAligned, dataBaseName);
+ if (isTableModel) {
+ statement = new InsertTabletStatement(tablet, isAligned, dataBaseName);
+ } else {
+ statement = new InsertTabletStatement(tablet, isAligned, null);
+ if (Objects.nonNull(dataBaseName)) {
+ statement.setDatabaseName(dataBaseName);
+ }
+ }
+ return statement;
} catch (final MetadataException e) {
LOGGER.warn(DataNodePipeMessages.GENERATE_STATEMENT_FROM_TABLET_ERROR,
tablet, e);
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 27740b12e07..3bcdcaf02e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -243,7 +243,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
insertNode,
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
?
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
- : null);
+ :
pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName());
if (!send(
pipeInsertNodeTabletInsertionEvent.getPipeName(),
@@ -290,7 +290,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
pipeRawTabletInsertionEvent.isAligned(),
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
- : null))) {
+ : pipeRawTabletInsertionEvent.getTreeModelDatabaseName()))) {
final String errorMessage =
String.format(
"Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 14c854fa2ad..9adbcf6cf16 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -300,7 +300,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
final String databaseName =
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
- : null;
+ : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
final TPipeTransferReq pipeTransferReq =
compressIfNeeded(
PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(insertNode,
databaseName));
@@ -327,7 +327,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
pipeRawTabletInsertionEvent.isAligned(),
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
- : null));
+ :
pipeRawTabletInsertionEvent.getTreeModelDatabaseName()));
final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
new PipeTransferTabletRawEventHandler(
pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 6cc416f4022..5e6297d8438 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -387,7 +387,7 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
insertNode,
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
?
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
- : null));
+ :
pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName()));
rateLimitIfNeeded(
pipeInsertNodeTabletInsertionEvent.getPipeName(),
pipeInsertNodeTabletInsertionEvent.getCreationTime(),
@@ -452,7 +452,7 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
pipeRawTabletInsertionEvent.isAligned(),
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
- : null));
+ :
pipeRawTabletInsertionEvent.getTreeModelDatabaseName()));
rateLimitIfNeeded(
pipeRawTabletInsertionEvent.getPipeName(),
pipeRawTabletInsertionEvent.getCreationTime(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 4c0795f12cd..581792475c0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -121,8 +121,6 @@ public class WriteBackSink implements PipeConnector {
private UserEntity userEntity;
- private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null;
-
private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser();
private static final Set<String> ALREADY_CREATED_DATABASES =
ConcurrentHashMap.newKeySet();
@@ -268,7 +266,7 @@ public class WriteBackSink implements PipeConnector {
final String dataBaseName =
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
- : TREE_MODEL_DATABASE_NAME_IDENTIFIER;
+ : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
final InsertBaseStatement insertBaseStatement;
insertBaseStatement =
@@ -311,7 +309,7 @@ public class WriteBackSink implements PipeConnector {
final String dataBaseName =
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
- : TREE_MODEL_DATABASE_NAME_IDENTIFIER;
+ : pipeRawTabletInsertionEvent.getTreeModelDatabaseName();
final InsertTabletStatement insertTabletStatement =
PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
index c5b9ebed4d5..e8b8e36cb49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.util;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
@@ -219,16 +220,22 @@ public class TabletStatementConverter {
final String databaseName = ReadWriteIOUtils.readString(byteBuffer);
if (databaseName != null) {
statement.setDatabaseName(databaseName);
- statement.setWriteToTable(true);
- // For table model, insertTargetName is table name, convert to
lowercase
- statement.setDevicePath(new
PartialPath(insertTargetName.toLowerCase(), false));
// Calculate memory for databaseName
memorySize +=
org.apache.tsfile.utils.RamUsageEstimator.sizeOf(databaseName);
- statement.setColumnCategories(columnCategories);
-
- memorySize += columnCategoriesMemorySize;
- memorySize += tagColumnIndicesSize;
+ if (PathUtils.isTableModelDatabase(databaseName)) {
+ statement.setWriteToTable(true);
+ // For table model, insertTargetName is table name, convert to
lowercase
+ statement.setDevicePath(new
PartialPath(insertTargetName.toLowerCase(), false));
+ statement.setColumnCategories(columnCategories);
+
+ memorySize += columnCategoriesMemorySize;
+ memorySize += tagColumnIndicesSize;
+ } else {
+ statement.setDevicePath(
+
DataNodeDevicePathCache.getInstance().getPartialPath(insertTargetName));
+ statement.setColumnCategories(null);
+ }
} else {
// For tree model, use DataNodeDevicePathCache
statement.setDevicePath(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 1246ec9049c..38704ec7fea 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -44,6 +44,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.Cre
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -66,7 +68,9 @@ import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class PipeDataNodeThriftRequestTest {
@@ -201,6 +205,33 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(statement.getDatabaseName().get(), "test");
}
+ @Test
+ public void testPipeTransferInsertNodeReqV2WithTreeModelDatabase() {
+ final PipeTransferTabletInsertNodeReqV2 req =
+ PipeTransferTabletInsertNodeReqV2.toTPipeTransferReq(
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "test", "d"}),
+ false,
+ new String[] {"s"},
+ new TSDataType[] {TSDataType.INT32},
+ 1,
+ new Object[] {1},
+ false),
+ "root.test");
+ final PipeTransferTabletInsertNodeReqV2 deserializeReq =
+ PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req);
+
+ final InsertBaseStatement statement = deserializeReq.constructStatement();
+ final List<PartialPath> paths = new ArrayList<>();
+ paths.add(new PartialPath(new String[] {"root", "test", "d", "s"}));
+
+ Assert.assertEquals(statement.getPaths(), paths);
+ Assert.assertFalse(statement.isWriteToTable());
+ Assert.assertTrue(statement.getDatabaseName().isPresent());
+ Assert.assertEquals("root.test", statement.getDatabaseName().get());
+ }
+
@Test
public void testPipeTransferTabletBinaryReq() {
// Not do real test here since "serializeToWal" needs private inner class
of walBuffer
@@ -365,6 +396,39 @@ public class PipeDataNodeThriftRequestTest {
}
}
+ @Test
+ public void testPipeTransferTabletReqV2WithTreeModelDatabase() {
+ try {
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT32));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.TEXT));
+ final Tablet tablet = new Tablet("root.test.d", schemaList, 8);
+ tablet.addTimestamp(0, 2);
+ tablet.addTimestamp(1, 1);
+ tablet.addValue("s1", 0, 2);
+ tablet.addValue("s2", 0, "2");
+ tablet.addValue("s1", 1, 1);
+ tablet.addValue("s2", 1, "1");
+
+ final PipeTransferTabletRawReqV2 req =
+ PipeTransferTabletRawReqV2.toTPipeTransferReq(tablet, false,
"root.test");
+ final PipeTransferTabletRawReqV2 deserializeReq =
+ PipeTransferTabletRawReqV2.fromTPipeTransferReq(req);
+
+ final InsertBaseStatement statement =
deserializeReq.constructStatement();
+ final List<PartialPath> paths = new ArrayList<>();
+ paths.add(new PartialPath(new String[] {"root", "test", "d", "s1"}));
+ paths.add(new PartialPath(new String[] {"root", "test", "d", "s2"}));
+
+ Assert.assertEquals(paths, statement.getPaths());
+ Assert.assertFalse(statement.isWriteToTable());
+ Assert.assertTrue(statement.getDatabaseName().isPresent());
+ Assert.assertEquals("root.test", statement.getDatabaseName().get());
+ } catch (final IOException e) {
+ Assert.fail();
+ }
+ }
+
@Test
public void testPipeTransferTabletBatchReq() throws IOException {
final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
@@ -503,6 +567,94 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals("test",
deserializedReq.getInsertNodeReqs().get(0).getDataBaseName());
}
+ @Test
+ public void testPipeTransferTabletBatchReqV2WithMultipleTreeModelDatabases()
throws IOException {
+ final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
+ final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+ final List<String> insertDataBase = new ArrayList<>();
+ final List<String> tabletDataBase = new ArrayList<>();
+
+ insertNodeBuffers.add(
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "db1", "d"}),
+ false,
+ new String[] {"s"},
+ new TSDataType[] {TSDataType.INT32},
+ 1,
+ new Object[] {1},
+ false)
+ .serializeToByteBuffer());
+ insertDataBase.add("root.db1");
+
+ insertNodeBuffers.add(
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "db2", "d"}),
+ false,
+ new String[] {"s"},
+ new TSDataType[] {TSDataType.INT32},
+ 2,
+ new Object[] {2},
+ false)
+ .serializeToByteBuffer());
+ insertDataBase.add("root.db2");
+
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT32));
+
+ final Tablet db1Tablet = new Tablet("root.db1.d", schemaList, 8);
+ db1Tablet.addTimestamp(0, 1);
+ db1Tablet.addValue("s1", 0, 1);
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ db1Tablet.serialize(outputStream);
+ ReadWriteIOUtils.write(false, outputStream);
+ tabletBuffers.add(
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
+ tabletDataBase.add("root.db1");
+ }
+
+ final Tablet db2Tablet = new Tablet("root.db2.d", schemaList, 8);
+ db2Tablet.addTimestamp(0, 2);
+ db2Tablet.addValue("s1", 0, 2);
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ db2Tablet.serialize(outputStream);
+ ReadWriteIOUtils.write(false, outputStream);
+ tabletBuffers.add(
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
+ tabletDataBase.add("root.db2");
+ }
+
+ final PipeTransferTabletBatchReqV2 req =
+ PipeTransferTabletBatchReqV2.toTPipeTransferReq(
+ insertNodeBuffers, tabletBuffers, insertDataBase, tabletDataBase);
+ final PipeTransferTabletBatchReqV2 deserializedReq =
+ PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req);
+
+ final List<InsertBaseStatement> statements =
deserializedReq.constructStatements();
+ final Set<String> insertRowsDatabases = new HashSet<>();
+ final Set<String> insertTabletsDatabases = new HashSet<>();
+
+ for (final InsertBaseStatement statement : statements) {
+ Assert.assertFalse(statement.isWriteToTable());
+ Assert.assertTrue(statement.getDatabaseName().isPresent());
+ if (statement instanceof InsertRowsStatement) {
+ insertRowsDatabases.add(statement.getDatabaseName().get());
+ } else if (statement instanceof InsertMultiTabletsStatement) {
+ insertTabletsDatabases.add(statement.getDatabaseName().get());
+ } else {
+ Assert.fail("Unexpected statement type: " +
statement.getClass().getName());
+ }
+ }
+
+ Assert.assertEquals(
+ new HashSet<>(java.util.Arrays.asList("root.db1", "root.db2")),
insertRowsDatabases);
+ Assert.assertEquals(
+ new HashSet<>(java.util.Arrays.asList("root.db1", "root.db2")),
insertTabletsDatabases);
+ }
+
@Test
public void testPipeTransferFilePieceReq() throws IOException {
final byte[] body = "testPipeTransferFilePieceReq".getBytes();