This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 48e9a752288 Pipe IT: Fix CI instability issues related to type
conversion on data sync receivers (#13340) (#13348)
48e9a752288 is described below
commit 48e9a7522882a84e81b9d1f3da98b0480ae05c21
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Aug 29 22:40:05 2024 +0800
Pipe IT: Fix CI instability issues related to type conversion on data sync
receivers (#13340) (#13348)
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 10 ++++---
...peStatementDataTypeConvertExecutionVisitor.java | 31 ++--------------------
2 files changed, 8 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 52bbe1f68bc..4f4779e75da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -614,10 +614,12 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
final TSStatus status = executeStatement(statement);
- return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- || !shouldConvertDataTypeOnTypeMismatch
- ? status
- : statement.accept(STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR,
status).orElse(status);
+ return shouldConvertDataTypeOnTypeMismatch
+ && ((statement instanceof InsertBaseStatement
+ && ((InsertBaseStatement)
statement).hasFailedMeasurements())
+ || status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ ? statement.accept(STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR,
status).orElse(status)
+ : status;
}
private static TSStatus executeStatement(final Statement statement) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 9f669a75b8b..868785ec8fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
@@ -134,22 +133,12 @@ public class PipeStatementDataTypeConvertExecutionVisitor
@Override
public Optional<TSStatus> visitInsertRow(
final InsertRowStatement insertRowStatement, final TSStatus status) {
- return status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
- && status.getMessage() != null
- &&
status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
- ? tryExecute(new PipeConvertedInsertRowStatement(insertRowStatement))
- : Optional.empty();
+ return tryExecute(new PipeConvertedInsertRowStatement(insertRowStatement));
}
@Override
public Optional<TSStatus> visitInsertRows(
final InsertRowsStatement insertRowsStatement, final TSStatus status) {
- if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
- || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
- &&
status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) {
- return Optional.empty();
- }
-
if (insertRowsStatement.getInsertRowStatementList() == null
|| insertRowsStatement.getInsertRowStatementList().isEmpty()) {
return Optional.empty();
@@ -166,12 +155,6 @@ public class PipeStatementDataTypeConvertExecutionVisitor
@Override
public Optional<TSStatus> visitInsertRowsOfOneDevice(
final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement,
final TSStatus status) {
- if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
- || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
- &&
status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) {
- return Optional.empty();
- }
-
if (insertRowsOfOneDeviceStatement.getInsertRowStatementList() == null
||
insertRowsOfOneDeviceStatement.getInsertRowStatementList().isEmpty()) {
return Optional.empty();
@@ -189,22 +172,12 @@ public class PipeStatementDataTypeConvertExecutionVisitor
@Override
public Optional<TSStatus> visitInsertTablet(
final InsertTabletStatement insertTabletStatement, final TSStatus
status) {
- return status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
- && status.getMessage() != null
- &&
status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
- ? tryExecute(new
PipeConvertedInsertTabletStatement(insertTabletStatement))
- : Optional.empty();
+ return tryExecute(new
PipeConvertedInsertTabletStatement(insertTabletStatement));
}
@Override
public Optional<TSStatus> visitInsertMultiTablets(
final InsertMultiTabletsStatement insertMultiTabletsStatement, final
TSStatus status) {
- if (!((status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
- || status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
- &&
status.toString().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING))) {
- return Optional.empty();
- }
-
if (insertMultiTabletsStatement.getInsertTabletStatementList() == null
||
insertMultiTabletsStatement.getInsertTabletStatementList().isEmpty()) {
return Optional.empty();