This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 90055d55b6e Fix pipe receiver type conversion load path (#17849)
90055d55b6e is described below
commit 90055d55b6e1166580824e5cbed77a7c253ef514
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 11:20:37 2026 +0800
Fix pipe receiver type conversion load path (#17849)
* Add pipe type conversion semantic ITs
* Fix float-to-date semantic IT expectation
* Add aligned stream pipe type conversion IT
* Fix pipe table type conversion load path
* Fix alter type IT pipe memory flakiness
---
.../db/it/schema/IoTDBAlterTimeSeriesTypeIT.java | 7 +-
.../pipe/it/dual/TypeConversionSemanticCase.java | 315 +++++++++++++++++++++
.../IoTDBPipeTypeConversionSemanticIT.java | 255 +++++++++++++++++
.../IoTDBPipeTypeConversionSemanticIT.java | 259 +++++++++++++++++
.../it/schema/IoTDBAlterColumnTypeIT.java | 9 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 44 ++-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 3 +-
.../analyze/load/LoadTsFileTableSchemaCache.java | 31 +-
.../protocol/thrift/IoTDBDataNodeReceiverTest.java | 43 ++-
.../plan/analyze/load/LoadTsFileAnalyzerTest.java | 69 ++++-
10 files changed, 1003 insertions(+), 32 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
index e5fa5ca59a7..fcde3f61e37 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
@@ -98,6 +98,7 @@ public class IoTDBAlterTimeSeriesTypeIT {
@BeforeClass
public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleInterval(1000);
EnvFactory.getEnv().initClusterEnvironment();
}
@@ -878,11 +879,7 @@ public class IoTDBAlterTimeSeriesTypeIT {
session.executeQueryStatement("select count(s1) from " + database +
".load_and_alter");
RowRecord rec;
rec = dataSet.next();
- // Due to the operation of load tsfile execute directly, don't access
memtable or generate
- // InsertNode object, so don't need to check the data type.
- // When query this measurement point, will only find the data of
TSDataType.INT32. So this is
- // reason what cause we can't find the data of TSDataType.DOUBLE. So
result is 9, is not 15.
- // assertEquals(15, rec.getFields().get(0).getLongV());
+ // Before alter, DOUBLE TsFiles loaded directly are invisible under the
existing INT32 schema.
assertEquals(9, rec.getFields().get(0).getLongV());
assertFalse(dataSet.hasNext());
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java
new file mode 100644
index 00000000000..3ece5cf865e
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual;
+
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BytesUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.List;
+
+public class TypeConversionSemanticCase {
+
+ public static final int ROW_COUNT = 3;
+
+ public static final List<TypeConversionSemanticCase> CASES =
+ Arrays.asList(
+ c(
+ "bool_to_int32",
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ values("true", "false", "true"),
+ values("1", "0", "1")),
+ c(
+ "bool_to_int64",
+ TSDataType.BOOLEAN,
+ TSDataType.INT64,
+ values("true", "false", "true"),
+ values("1", "0", "1")),
+ c(
+ "bool_to_float",
+ TSDataType.BOOLEAN,
+ TSDataType.FLOAT,
+ values("true", "false", "true"),
+ values("1.0", "0.0", "1.0")),
+ c(
+ "bool_to_double",
+ TSDataType.BOOLEAN,
+ TSDataType.DOUBLE,
+ values("true", "false", "true"),
+ values("1.0", "0.0", "1.0")),
+ c(
+ "bool_to_text",
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT,
+ values("true", "false", "true"),
+ values("true", "false", "true")),
+ c(
+ "bool_to_blob",
+ TSDataType.BOOLEAN,
+ TSDataType.BLOB,
+ values("true", "false", "true"),
+ values(blobValue("true"), blobValue("false"),
blobValue("true"))),
+ c(
+ "bool_to_string",
+ TSDataType.BOOLEAN,
+ TSDataType.STRING,
+ values("true", "false", "true"),
+ values("true", "false", "true")),
+ c(
+ "bool_to_date",
+ TSDataType.BOOLEAN,
+ TSDataType.DATE,
+ values("true", "false", "true"),
+ values("1970-01-02", "1970-01-01", "1970-01-02")),
+ c(
+ "bool_to_timestamp",
+ TSDataType.BOOLEAN,
+ TSDataType.TIMESTAMP,
+ values("true", "false", "true"),
+ values(timestampValue(1), timestampValue(0), timestampValue(1))),
+ c(
+ "int32_to_boolean",
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ values("0", "2", "-1"),
+ values("false", "true", "true")),
+ c(
+ "int32_to_timestamp",
+ TSDataType.INT32,
+ TSDataType.TIMESTAMP,
+ values("0", "1", "86400000"),
+ values(timestampValue(0), timestampValue(1),
timestampValue(86400000))),
+ c(
+ "int32_to_date",
+ TSDataType.INT32,
+ TSDataType.DATE,
+ values("19700102", "20240229", "42"),
+ values("1970-01-02", "2024-02-29", "1970-01-01")),
+ c(
+ "int64_to_int32",
+ TSDataType.INT64,
+ TSDataType.INT32,
+ values("2147483648", "-2147483649", "42"),
+ values("-2147483648", "2147483647", "42")),
+ c(
+ "int64_to_boolean",
+ TSDataType.INT64,
+ TSDataType.BOOLEAN,
+ values("0", "2", "-1"),
+ values("false", "true", "true")),
+ c(
+ "int64_to_date",
+ TSDataType.INT64,
+ TSDataType.DATE,
+ values("19700102", "2147483648", "19700103"),
+ values("1970-01-02", "1970-01-01", "1970-01-03")),
+ c(
+ "float_to_int32",
+ TSDataType.FLOAT,
+ TSDataType.INT32,
+ values("2.9", "-2.9", "0.0"),
+ values("2", "-2", "0")),
+ c(
+ "float_to_boolean",
+ TSDataType.FLOAT,
+ TSDataType.BOOLEAN,
+ values("0.0", "0.1", "-0.1"),
+ values("false", "true", "true")),
+ c(
+ "float_to_date",
+ TSDataType.FLOAT,
+ TSDataType.DATE,
+ values("19700102.0", "19700104.0", "42.9"),
+ values("1970-01-02", "1970-01-04", "1970-01-01")),
+ c(
+ "double_to_int64",
+ TSDataType.DOUBLE,
+ TSDataType.INT64,
+ values("3.9", "-3.9", "0.0"),
+ values("3", "-3", "0")),
+ c(
+ "double_to_boolean",
+ TSDataType.DOUBLE,
+ TSDataType.BOOLEAN,
+ values("0.0", "0.1", "-0.1"),
+ values("false", "true", "true")),
+ c(
+ "double_to_timestamp",
+ TSDataType.DOUBLE,
+ TSDataType.TIMESTAMP,
+ values("1.9", "86400000.9", "0.0"),
+ values(timestampValue(1), timestampValue(86400000),
timestampValue(0))),
+ c(
+ "text_to_int32",
+ TSDataType.TEXT,
+ TSDataType.INT32,
+ values("'123.9'", "'bad'", "'-123.9'"),
+ values("123", "0", "-123")),
+ c(
+ "string_to_int64",
+ TSDataType.STRING,
+ TSDataType.INT64,
+ values("'456.9'", "'bad'", "'-456.9'"),
+ values("456", "0", "-456")),
+ c(
+ "blob_to_float",
+ TSDataType.BLOB,
+ TSDataType.FLOAT,
+ values(blobSql("7.5"), blobSql("bad"), blobSql("-7.5")),
+ values("7.5", "0.0", "-7.5")),
+ c(
+ "text_to_double",
+ TSDataType.TEXT,
+ TSDataType.DOUBLE,
+ values("'8.25'", "'bad'", "'-8.25'"),
+ values("8.25", "0.0", "-8.25")),
+ c(
+ "text_to_boolean",
+ TSDataType.TEXT,
+ TSDataType.BOOLEAN,
+ values("'true'", "'1'", "'TrUe'"),
+ values("true", "false", "true")),
+ c(
+ "string_to_boolean",
+ TSDataType.STRING,
+ TSDataType.BOOLEAN,
+ values("'TRUE'", "'false'", "'yes'"),
+ values("true", "false", "false")),
+ c(
+ "blob_to_boolean",
+ TSDataType.BLOB,
+ TSDataType.BOOLEAN,
+ values(blobSql("true"), blobSql("FALSE"), blobSql("0")),
+ values("true", "false", "false")),
+ c(
+ "text_to_timestamp",
+ TSDataType.TEXT,
+ TSDataType.TIMESTAMP,
+ values("'86400000'", "'1970-01-02T00:00:00.000'", "'bad'"),
+ values(timestampValue(86400000), timestampValue(86400000),
timestampValue(0))),
+ c(
+ "string_to_timestamp",
+ TSDataType.STRING,
+ TSDataType.TIMESTAMP,
+ values("'1970-01-03T00:00:00.000'", "'bad'", "'86400000'"),
+ values(timestampValue(172800000), timestampValue(0),
timestampValue(86400000))),
+ c(
+ "blob_to_timestamp",
+ TSDataType.BLOB,
+ TSDataType.TIMESTAMP,
+ values(blobSql("bad"), blobSql("1"),
blobSql("1970-01-02T00:00:00.000")),
+ values(timestampValue(0), timestampValue(1),
timestampValue(86400000))),
+ c(
+ "text_to_date",
+ TSDataType.TEXT,
+ TSDataType.DATE,
+ values("'19700102'", "'1970-01-04'", "'bad'"),
+ values("1970-01-02", "1970-01-04", "1970-01-01")),
+ c(
+ "string_to_date",
+ TSDataType.STRING,
+ TSDataType.DATE,
+ values("'1970-01-03'", "'19700105'", "'1970-01-07'"),
+ values("1970-01-03", "1970-01-05", "1970-01-07")),
+ c(
+ "blob_to_date",
+ TSDataType.BLOB,
+ TSDataType.DATE,
+ values(blobSql("bad"), blobSql("1970-01-06"),
blobSql("19700108")),
+ values("1970-01-01", "1970-01-06", "1970-01-08")),
+ c(
+ "timestamp_to_date",
+ TSDataType.TIMESTAMP,
+ TSDataType.DATE,
+ values("0", "86399999", "86400000"),
+ values("1970-01-01", "1970-01-01", "1970-01-02")),
+ c(
+ "date_to_timestamp",
+ TSDataType.DATE,
+ TSDataType.TIMESTAMP,
+ values("'1970-01-01'", "'1970-01-02'", "'1970-01-03'"),
+ values(timestampValue(0), timestampValue(86400000),
timestampValue(172800000))),
+ c(
+ "timestamp_to_boolean",
+ TSDataType.TIMESTAMP,
+ TSDataType.BOOLEAN,
+ values("0", "-1", "1"),
+ values("false", "true", "true")),
+ c(
+ "date_to_boolean",
+ TSDataType.DATE,
+ TSDataType.BOOLEAN,
+ values("'1970-01-01'", "'1970-01-02'", "'1969-12-31'"),
+ values("false", "true", "true")));
+
+ public final String measurement;
+ public final TSDataType sourceType;
+ public final TSDataType targetType;
+ public final String[] sourceSqlValues;
+ public final String[] expectedValues;
+
+ private TypeConversionSemanticCase(
+ final String measurement,
+ final TSDataType sourceType,
+ final TSDataType targetType,
+ final String[] sourceSqlValues,
+ final String[] expectedValues) {
+ this.measurement = measurement;
+ this.sourceType = sourceType;
+ this.targetType = targetType;
+ this.sourceSqlValues = sourceSqlValues;
+ this.expectedValues = expectedValues;
+ }
+
+ private static TypeConversionSemanticCase c(
+ final String measurement,
+ final TSDataType sourceType,
+ final TSDataType targetType,
+ final String[] sourceSqlValues,
+ final String[] expectedValues) {
+ return new TypeConversionSemanticCase(
+ measurement, sourceType, targetType, sourceSqlValues, expectedValues);
+ }
+
+ private static String[] values(final String... values) {
+ return values;
+ }
+
+ public static String timestampValue(final long timestamp) {
+ return RpcUtils.formatDatetime("default", "ms", timestamp, ZoneOffset.UTC);
+ }
+
+ private static String blobSql(final String value) {
+ final StringBuilder builder = new StringBuilder("X'");
+ for (final byte b : value.getBytes(StandardCharsets.UTF_8)) {
+ builder.append(String.format("%02x", b & 0xFF));
+ }
+ return builder.append("'").toString();
+ }
+
+ private static String blobValue(final String value) {
+ return
BytesUtils.parseBlobByteArrayToString(value.getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java
new file mode 100644
index 00000000000..6704d37b87e
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.it.dual.TypeConversionSemanticCase;
+import
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTableManualEnhanced.class})
+public class IoTDBPipeTypeConversionSemanticIT extends
AbstractPipeTableModelDualManualIT {
+
+ private static final String DATABASE = "pipe_type_conversion";
+ private static final String TABLE = "semantic_conversion";
+ private static final String STREAM_TABLE = "semantic_stream_conversion";
+ private static final List<TypeConversionSemanticCase> STREAM_CASES =
+ getCases(
+ "bool_to_int32",
+ "bool_to_int64",
+ "bool_to_float",
+ "bool_to_double",
+ "bool_to_blob",
+ "bool_to_date",
+ "bool_to_timestamp",
+ "int32_to_boolean",
+ "int32_to_timestamp",
+ "int32_to_date");
+
+ @Override
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+ setupConfig();
+ senderEnv.initClusterEnvironment(1, 1);
+ receiverEnv.initClusterEnvironment(1, 1);
+ }
+
+ @Override
+ protected void setupConfig() {
+ super.setupConfig();
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(1)
+ .setSchemaReplicationFactor(1);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(1)
+ .setSchemaReplicationFactor(1);
+ }
+
+ @Test
+ public void testPipeReceiverTypeConversionSemantics() {
+ createDatabaseAndTable(senderEnv, TABLE, TypeConversionSemanticCase.CASES,
true);
+ createDatabaseAndTable(receiverEnv, TABLE,
TypeConversionSemanticCase.CASES, false);
+ createPipe();
+
+ TestUtils.executeNonQueries(
+ DATABASE,
+ BaseEnv.TABLE_SQL_DIALECT,
+ senderEnv,
+ createInsertStatements(TABLE, TypeConversionSemanticCase.CASES),
+ null);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ createQuerySql(TABLE, TypeConversionSemanticCase.CASES),
+ createExpectedHeader(TypeConversionSemanticCase.CASES),
+ new HashSet<>(createExpectedRows(TypeConversionSemanticCase.CASES)),
+ 60,
+ DATABASE,
+ null);
+ }
+
+ @Test
+ public void testStreamPipeReceiverTypeConversionSemantics() {
+ createDatabaseAndTable(senderEnv, STREAM_TABLE, STREAM_CASES, true);
+ createDatabaseAndTable(receiverEnv, STREAM_TABLE, STREAM_CASES, false);
+ createStreamPipe();
+
+ TestUtils.executeNonQueries(
+ DATABASE,
+ BaseEnv.TABLE_SQL_DIALECT,
+ senderEnv,
+ createInsertStatements(STREAM_TABLE, STREAM_CASES),
+ null);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ createQuerySql(STREAM_TABLE, STREAM_CASES),
+ createExpectedHeader(STREAM_CASES),
+ new HashSet<>(createExpectedRows(STREAM_CASES)),
+ 60,
+ DATABASE,
+ null);
+ }
+
+ private static void createDatabaseAndTable(
+ final BaseEnv env,
+ final String table,
+ final List<TypeConversionSemanticCase> conversionCases,
+ final boolean useSourceType) {
+ final List<String> sqls = new ArrayList<>();
+ sqls.add("create database if not exists " + DATABASE);
+ sqls.add("use " + DATABASE);
+ final List<String> columns = new ArrayList<>();
+ columns.add("tag_id string tag");
+ for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+ columns.add(
+ String.format(
+ "%s %s field",
+ conversionCase.measurement,
+ useSourceType ? conversionCase.sourceType :
conversionCase.targetType));
+ }
+ sqls.add(String.format("create table %s (%s)", table, String.join(",",
columns)));
+ TestUtils.executeNonQueries(null, BaseEnv.TABLE_SQL_DIALECT, env, sqls,
null);
+ }
+
+ private void createPipe() {
+ TestUtils.executeNonQuery(
+ DATABASE,
+ BaseEnv.TABLE_SQL_DIALECT,
+ senderEnv,
+ String.format(
+ "create pipe type_conversion_semantic"
+ + " with source
('source'='iotdb-source','history.enable'='false','realtime.enable'='true','realtime.mode'='forced-log')"
+ + " with processor ('processor'='do-nothing-processor')"
+ + " with sink
('node-urls'='%s','batch.enable'='false','sink.format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()),
+ null);
+ }
+
+ private void createStreamPipe() {
+ TestUtils.executeNonQuery(
+ DATABASE,
+ BaseEnv.TABLE_SQL_DIALECT,
+ senderEnv,
+ String.format(
+ "create pipe stream_type_conversion_semantic"
+ + " with source
('source'='iotdb-source','history.enable'='false','realtime.enable'='true','realtime.mode'='stream')"
+ + " with processor ('processor'='do-nothing-processor')"
+ + " with sink
('sink'='iotdb-thrift-sink','sink.node-urls'='%s')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()),
+ null);
+ }
+
+ private static List<String> createInsertStatements(
+ final String table, final List<TypeConversionSemanticCase>
conversionCases) {
+ final List<String> sqls = new ArrayList<>();
+ final String measurements =
+ String.join(
+ ",",
+ conversionCases.stream()
+ .map(conversionCase -> conversionCase.measurement)
+ .toArray(String[]::new));
+ for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) {
+ final List<String> values = new ArrayList<>();
+ for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+ values.add(conversionCase.sourceSqlValues[row]);
+ }
+ sqls.add(
+ String.format(
+ "insert into %s(time,tag_id,%s) values (%d,'d',%s)",
+ table, measurements, row + 1, String.join(",", values)));
+ }
+ sqls.add("flush");
+ return sqls;
+ }
+
+ private static String createQuerySql(
+ final String table, final List<TypeConversionSemanticCase>
conversionCases) {
+ return String.format(
+ "select %s,time from %s where tag_id='d'",
+ String.join(
+ ",",
+ conversionCases.stream()
+ .map(conversionCase -> conversionCase.measurement)
+ .toArray(String[]::new)),
+ table);
+ }
+
+ private static String createExpectedHeader(
+ final List<TypeConversionSemanticCase> conversionCases) {
+ final List<String> columns = new ArrayList<>();
+ for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+ columns.add(conversionCase.measurement);
+ }
+ columns.add("time");
+ return String.join(",", columns) + ",";
+ }
+
+ private static List<String> createExpectedRows(
+ final List<TypeConversionSemanticCase> conversionCases) {
+ final List<String> rows = new ArrayList<>();
+ for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) {
+ final List<String> values = new ArrayList<>();
+ for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+ values.add(conversionCase.expectedValues[row]);
+ }
+ values.add(TypeConversionSemanticCase.timestampValue(row + 1));
+ rows.add(String.join(",", values) + ",");
+ }
+ return rows;
+ }
+
+ private static List<TypeConversionSemanticCase> getCases(final String...
measurements) {
+ final List<TypeConversionSemanticCase> cases = new ArrayList<>();
+ for (final String measurement : measurements) {
+ cases.add(getCase(measurement));
+ }
+ return cases;
+ }
+
+ private static TypeConversionSemanticCase getCase(final String measurement) {
+ for (final TypeConversionSemanticCase conversionCase :
TypeConversionSemanticCase.CASES) {
+ if (conversionCase.measurement.equals(measurement)) {
+ return conversionCase;
+ }
+ }
+ throw new IllegalArgumentException("Unknown type conversion semantic case:
" + measurement);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java
new file mode 100644
index 00000000000..b2218003379
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.treemodel.auto.enhanced;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.it.dual.TypeConversionSemanticCase;
+import
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTreeAutoEnhanced.class})
+public class IoTDBPipeTypeConversionSemanticIT extends
AbstractPipeDualTreeModelAutoIT {
+
+ private static final String DEVICE = "root.pipe_type_conversion.d";
+ private static final String ALIGNED_DEVICE =
"root.pipe_type_conversion.aligned_d";
+ private static final List<TypeConversionSemanticCase> ALIGNED_STREAM_CASES =
+ getCases(
+ "bool_to_int32",
+ "bool_to_int64",
+ "bool_to_float",
+ "bool_to_double",
+ "bool_to_blob",
+ "bool_to_date",
+ "bool_to_timestamp",
+ "int32_to_boolean",
+ "int32_to_timestamp",
+ "int32_to_date");
+
+ @Override
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+ setupConfig();
+ senderEnv.initClusterEnvironment(1, 1);
+ receiverEnv.initClusterEnvironment(1, 1);
+ }
+
+ @Override
+ protected void setupConfig() {
+ super.setupConfig();
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(1)
+ .setSchemaReplicationFactor(1);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(1)
+ .setSchemaReplicationFactor(1);
+ }
+
+ @Test
+ public void testPipeReceiverTypeConversionSemantics() {
+ createTimeseries(senderEnv, DEVICE, TypeConversionSemanticCase.CASES,
true);
+ createTimeseries(receiverEnv, DEVICE, TypeConversionSemanticCase.CASES,
false);
+ createPipe();
+
+ TestUtils.executeNonQueries(
+ senderEnv, createInsertStatements(DEVICE,
TypeConversionSemanticCase.CASES, false), null);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ createQuerySql(DEVICE, TypeConversionSemanticCase.CASES),
+ createExpectedHeader(DEVICE, TypeConversionSemanticCase.CASES),
+ new HashSet<>(createExpectedRows(TypeConversionSemanticCase.CASES)),
+ 60);
+ }
+
+ @Test
+ public void testAlignedStreamPipeReceiverTypeConversionSemantics() {
+ createAlignedTimeseries(senderEnv, ALIGNED_DEVICE, ALIGNED_STREAM_CASES,
true);
+ createAlignedTimeseries(receiverEnv, ALIGNED_DEVICE, ALIGNED_STREAM_CASES,
false);
+ createStreamPipe(ALIGNED_DEVICE);
+
+ TestUtils.executeNonQueries(
+ senderEnv, createInsertStatements(ALIGNED_DEVICE,
ALIGNED_STREAM_CASES, true), null);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ createQuerySql(ALIGNED_DEVICE, ALIGNED_STREAM_CASES),
+ createExpectedHeader(ALIGNED_DEVICE, ALIGNED_STREAM_CASES),
+ new HashSet<>(createExpectedRows(ALIGNED_STREAM_CASES)),
+ 60);
+ }
+
+ private static void createTimeseries(
+ final BaseEnv env,
+ final String device,
+ final List<TypeConversionSemanticCase> conversionCases,
+ final boolean useSourceType) {
+ final List<String> sqls = new ArrayList<>();
+ for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+ sqls.add(
+ String.format(
+ "create timeseries %s.%s with datatype=%s,encoding=PLAIN",
+ device,
+ conversionCase.measurement,
+ useSourceType ? conversionCase.sourceType :
conversionCase.targetType));
+ }
+ TestUtils.executeNonQueries(env, sqls, null);
+ }
+
+ private static void createAlignedTimeseries(
+ final BaseEnv env,
+ final String device,
+ final List<TypeConversionSemanticCase> conversionCases,
+ final boolean useSourceType) {
+ final List<String> measurements = new ArrayList<>();
+ for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+ measurements.add(
+ String.format(
+ "%s %s encoding=PLAIN",
+ conversionCase.measurement,
+ useSourceType ? conversionCase.sourceType :
conversionCase.targetType));
+ }
+ TestUtils.executeNonQuery(
+ env,
+ String.format("create aligned timeseries %s(%s)", device,
String.join(",", measurements)),
+ null);
+ }
+
+ private void createPipe() {
+ TestUtils.executeNonQuery(
+ senderEnv,
+ String.format(
+ "create pipe type_conversion_semantic"
+ + " with source
('source'='iotdb-source','source.path'='%s.**','history.enable'='false','realtime.mode'='forced-log')"
+ + " with processor ('processor'='do-nothing-processor')"
+ + " with sink
('node-urls'='%s','batch.enable'='false','sink.format'='tablet')",
+ DEVICE,
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()),
+ null);
+ }
+
+ private void createStreamPipe(final String device) {
+ TestUtils.executeNonQuery(
+ senderEnv,
+ String.format(
+ "create pipe aligned_type_conversion_semantic"
+ + " with source
('source'='iotdb-source','source.path'='%s.**','history.enable'='false','realtime.mode'='stream')"
+ + " with processor ('processor'='do-nothing-processor')"
+ + " with sink
('sink'='iotdb-thrift-sink','sink.node-urls'='%s')",
+ device,
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()),
+ null);
+ }
+
+ private static List<String> createInsertStatements(
+ final String device,
+ final List<TypeConversionSemanticCase> conversionCases,
+ final boolean isAligned) {
+ final List<String> sqls = new ArrayList<>();
+ final String measurements =
+ String.join(
+ ",",
+ conversionCases.stream()
+ .map(conversionCase -> conversionCase.measurement)
+ .toArray(String[]::new));
+ for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) {
+ final List<String> values = new ArrayList<>();
+ for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+ values.add(conversionCase.sourceSqlValues[row]);
+ }
+ sqls.add(
+ String.format(
+ "insert into %s(time,%s)%s values (%d,%s)",
+ device,
+ measurements,
+ isAligned ? " aligned" : "",
+ row + 1,
+ String.join(",", values)));
+ }
+ sqls.add("flush");
+ return sqls;
+ }
+
+ private static String createQuerySql(
+ final String device, final List<TypeConversionSemanticCase>
conversionCases) {
+ return String.format(
+ "select %s from %s",
+ String.join(
+ ",",
+ conversionCases.stream()
+ .map(conversionCase -> conversionCase.measurement)
+ .toArray(String[]::new)),
+ device);
+ }
+
+ private static String createExpectedHeader(
+ final String device, final List<TypeConversionSemanticCase>
conversionCases) {
+ final List<String> columns = new ArrayList<>();
+ columns.add("Time");
+ for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+ columns.add(device + "." + conversionCase.measurement);
+ }
+ return String.join(",", columns) + ",";
+ }
+
+ private static List<String> createExpectedRows(
+ final List<TypeConversionSemanticCase> conversionCases) {
+ final List<String> rows = new ArrayList<>();
+ for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) {
+ final List<String> values = new ArrayList<>();
+ values.add(Integer.toString(row + 1));
+ for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+ values.add(conversionCase.expectedValues[row]);
+ }
+ rows.add(String.join(",", values) + ",");
+ }
+ return rows;
+ }
+
+ private static List<TypeConversionSemanticCase> getCases(final String...
measurements) {
+ final List<TypeConversionSemanticCase> cases = new ArrayList<>();
+ for (final String measurement : measurements) {
+ cases.add(getCase(measurement));
+ }
+ return cases;
+ }
+
+ private static TypeConversionSemanticCase getCase(final String measurement) {
+ for (final TypeConversionSemanticCase conversionCase :
TypeConversionSemanticCase.CASES) {
+ if (conversionCase.measurement.equals(measurement)) {
+ return conversionCase;
+ }
+ }
+ throw new IllegalArgumentException("Unknown type conversion semantic case:
" + measurement);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
index 0903a0a3a0d..f04b6682962 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
@@ -99,6 +99,7 @@ public class IoTDBAlterColumnTypeIT {
@BeforeClass
public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleInterval(1000);
EnvFactory.getEnv().initClusterEnvironment();
try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
@@ -936,12 +937,8 @@ public class IoTDBAlterColumnTypeIT {
session.executeQueryStatement("select count(s1) from
load_and_alter");
RowRecord rec;
rec = dataSet.next();
- // Due to the operation of load tsfile execute directly, don't access
memtable or generate
- // InsertNode object, so don't need to check the data type.
- // When query this measurement point, will only find the data of
TSDataType.INT32. So this is
- // reason what cause we can't find the data of TSDataType.DOUBLE. So
result is 9, is not 18.
- // assertEquals(18, rec.getFields().get(0).getLongV());
- assertEquals(9, rec.getFields().get(0).getLongV());
+ // LOAD converts TsFiles with mismatched types into tablets by default.
+ assertEquals(18, rec.getFields().get(0).getLongV());
assertFalse(dataSet.hasNext());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 731c2eb50f0..e6c8ddefc99 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -590,7 +590,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
dataBaseName,
LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName),
shouldConvertDataTypeOnTypeMismatch,
- validateTsFile,
+ validateTsFile || shouldConvertDataTypeOnTypeMismatch,
null,
shouldMarkAsPipeRequest);
}
@@ -598,16 +598,23 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus loadTsFileSync(final String dataBaseName, final String
fileAbsolutePath)
throws FileNotFoundException {
return executeStatementAndClassifyExceptions(
- buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath,
validateTsFile.get()));
+ buildLoadTsFileStatementForSync(
+ dataBaseName,
+ fileAbsolutePath,
+ validateTsFile.get(),
+ shouldConvertDataTypeOnTypeMismatch));
}
static LoadTsFileStatement buildLoadTsFileStatementForSync(
- final String dataBaseName, final String fileAbsolutePath, final boolean
validateTsFile)
+ final String dataBaseName,
+ final String fileAbsolutePath,
+ final boolean validateTsFile,
+ final boolean shouldConvertDataTypeOnTypeMismatch)
throws FileNotFoundException {
final LoadTsFileStatement statement =
LoadTsFileStatement.createUnchecked(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
- statement.setConvertOnTypeMismatch(true);
- statement.setVerifySchema(validateTsFile);
+ statement.setConvertOnTypeMismatch(shouldConvertDataTypeOnTypeMismatch);
+ statement.setVerifySchema(validateTsFile ||
shouldConvertDataTypeOnTypeMismatch);
statement.setAutoCreateDatabase(
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
statement.setDatabase(dataBaseName);
@@ -956,14 +963,26 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
}
+ // Execute insert statements through the conversion wrapper first to avoid
writing a partial
+ // row/tablet before the type mismatch is converted.
+ if (shouldConvertDataTypeOnTypeMismatch && statement instanceof
InsertBaseStatement) {
+ final Optional<TSStatus> convertedStatus =
+ executeInsertStatementWithDataTypeConversion(
+ statement, isTableModelStatement, databaseName);
+ if (convertedStatus.isPresent()) {
+ return convertedStatus.get();
+ }
+ }
+
// Real execution of the statement
final TSStatus status =
isTableModelStatement
? executeStatementForTableModel(statement, databaseName)
: executeStatementForTreeModel(statement);
- // Try to convert data type if the statement is a tree model statement
- // and the status code is not success
+ // Try to convert data type if the status code is not success. Insert
statements normally return
+ // above after the first converted execution. The retry path is kept for
load and fallback
+ // cases.
if (!shouldConvertDataTypeOnTypeMismatch
|| !((statement instanceof InsertBaseStatement
&& ((InsertBaseStatement) statement).hasFailedMeasurements())
@@ -986,6 +1005,17 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
: statement.accept(treeStatementDataTypeConvertExecutionVisitor,
status).orElse(status);
}
+ private Optional<TSStatus> executeInsertStatementWithDataTypeConversion(
+ final Statement statement, final boolean isTableModelStatement, final
String databaseName) {
+ return isTableModelStatement
+ ? statement.accept(
+ tableStatementDataTypeConvertExecutionVisitor,
+ new Pair<>(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), databaseName))
+ : statement.accept(
+ treeStatementDataTypeConvertExecutionVisitor,
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
private boolean shouldUseTableModelVisitorForLoadStatement(
final LoadTsFileStatement loadTsFileStatement) {
final List<Boolean> isTableModel = loadTsFileStatement.getIsTableModel();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 5171ccd93c3..e357b86e0c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -619,7 +619,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private LoadTsFileTableSchemaCache getOrCreateTableSchemaCache() {
if (tableSchemaCache == null) {
- tableSchemaCache = new LoadTsFileTableSchemaCache(metadata, context,
isAutoCreateDatabase);
+ tableSchemaCache =
+ new LoadTsFileTableSchemaCache(metadata, context,
isAutoCreateDatabase, isVerifySchema);
}
return tableSchemaCache;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index c3c0ea73310..92a56d29a55 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -98,6 +99,7 @@ public class LoadTsFileTableSchemaCache {
private Map<String, org.apache.tsfile.file.metadata.TableSchema>
tableSchemaMap;
private final Metadata metadata;
private final MPPQueryContext context;
+ private final boolean shouldVerifyDataType;
private Map<String, Set<IDeviceID>> currentBatchTable2Devices;
@@ -116,13 +118,17 @@ public class LoadTsFileTableSchemaCache {
private final AtomicBoolean needDecode4DifferentTimeColumn = new
AtomicBoolean(false);
public LoadTsFileTableSchemaCache(
- final Metadata metadata, final MPPQueryContext context, final boolean
needToCreateDatabase)
+ final Metadata metadata,
+ final MPPQueryContext context,
+ final boolean needToCreateDatabase,
+ final boolean shouldVerifyDataType)
throws LoadRuntimeOutOfMemoryException {
this.block =
LoadTsFileMemoryManager.getInstance()
.allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
this.metadata = metadata;
this.context = context;
+ this.shouldVerifyDataType = shouldVerifyDataType;
this.currentBatchTable2Devices = new HashMap<>();
this.currentModifications = PatternTreeMapFactory.getModsPatternTreeMap();
this.needToCreateDatabase = needToCreateDatabase;
@@ -389,14 +395,25 @@ public class LoadTsFileTableSchemaCache {
}
} else if (fileColumn.getColumnCategory() ==
TsTableColumnCategory.FIELD) {
ColumnSchema realColumn =
fieldColumnNameToSchema.get(fileColumn.getName());
- if (LOGGER.isDebugEnabled()
- && (realColumn == null ||
!fileColumn.getType().equals(realColumn.getType()))) {
+ if (realColumn != null &&
!fileColumn.getType().equals(realColumn.getType())) {
+ final String message =
+ String.format(
+ "Data type mismatch for column %s in table %s, type in
TsFile: %s, type in IoTDB: %s",
+ fileColumn.getName(),
+ realSchema.getTableName(),
+ fileColumn.getType(),
+ realColumn.getType());
+ if (shouldVerifyDataType) {
+ throw new LoadAnalyzeTypeMismatchException(message);
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(message);
+ }
+ } else if (LOGGER.isDebugEnabled() && realColumn == null) {
LOGGER.debug(
- "Data type mismatch for column {} in table {}, type in TsFile:
{}, type in IoTDB: {}",
+ "Column {} in table {} is not found in IoTDB while loading
TsFile.",
fileColumn.getName(),
- realSchema.getTableName(),
- fileColumn.getType(),
- Objects.nonNull(realColumn) ? realColumn.getType() : null);
+ realSchema.getTableName());
}
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
index f41c44763f9..1e279a18feb 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -39,7 +39,7 @@ public class IoTDBDataNodeReceiverTest {
try {
final LoadTsFileStatement statement =
IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
- "root.test.sg_0", tsFile.toString(), true);
+ "root.test.sg_0", tsFile.toString(), true, true);
Assert.assertEquals("root.test.sg_0", statement.getDatabase());
Assert.assertEquals(2, statement.getDatabaseLevel());
@@ -54,16 +54,17 @@ public class IoTDBDataNodeReceiverTest {
try {
final Map<String, String> attributes =
IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync(
- "root.test.sg_0", true, true, true);
+ "root.test.sg_0", true, false, true);
Assert.assertEquals(
"root.test.sg_0",
attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY));
Assert.assertEquals("2",
attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY));
final LoadTsFileStatement statement =
LoadTsFileStatement.createUnchecked(tsFile.toString());
- ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement,
true);
+ ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement,
false);
Assert.assertEquals("root.test.sg_0", statement.getDatabase());
Assert.assertEquals(2, statement.getDatabaseLevel());
+ Assert.assertTrue(statement.isVerifySchema());
} finally {
Files.deleteIfExists(tsFile);
}
@@ -75,7 +76,8 @@ public class IoTDBDataNodeReceiverTest {
final Path tsFile =
Files.createTempFile("pipe-load-default-database-level", ".tsfile");
try {
final LoadTsFileStatement statement =
- IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null,
tsFile.toString(), true);
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ null, tsFile.toString(), true, true);
Assert.assertNull(statement.getDatabase());
Assert.assertEquals(
@@ -92,7 +94,7 @@ public class IoTDBDataNodeReceiverTest {
try {
final LoadTsFileStatement statement =
IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
- "root.test.sg_0", tsFile.toString(), true);
+ "root.test.sg_0", tsFile.toString(), true, true);
final long receiverId = System.nanoTime();
final Exception exception = new RuntimeException("repeated receiver
exception " + receiverId);
@@ -107,4 +109,35 @@ public class IoTDBDataNodeReceiverTest {
Files.deleteIfExists(tsFile);
}
}
+
+ @Test
+ public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType()
throws Exception {
+ final Path tsFile =
Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ "root.test.sg_0", tsFile.toString(), false, true);
+
+ Assert.assertTrue(statement.isConvertOnTypeMismatch());
+ Assert.assertTrue(statement.isVerifySchema());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+
+ @Test
+ public void
testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType()
+ throws Exception {
+ final Path tsFile =
Files.createTempFile("pipe-load-no-convert-no-verify-schema", ".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ "root.test.sg_0", tsFile.toString(), false, false);
+
+ Assert.assertFalse(statement.isConvertOnTypeMismatch());
+ Assert.assertFalse(statement.isVerifySchema());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
index d6bdb1e37fe..01406b0fd06 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.db.queryengine.plan.analyze.load;
+import
org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
@@ -33,6 +36,7 @@ import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator;
+import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -45,6 +49,7 @@ import org.junit.Test;
import java.io.File;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
@@ -109,6 +114,41 @@ public class LoadTsFileAnalyzerTest {
Assert.assertEquals(2, schemaCache.getVerifiedDeviceCount());
}
+ @Test
+ public void testTableSchemaCacheShouldThrowMismatchWhenVerifyingDataType()
throws Exception {
+ final LoadTsFileTableSchemaCache schemaCache =
createTableSchemaCache(true);
+ try {
+ final InvocationTargetException exception =
+ Assert.assertThrows(
+ InvocationTargetException.class,
+ () ->
+ getVerifyTableDataTypeMethod()
+ .invoke(
+ schemaCache,
+ createTableSchema(TSDataType.INT64),
+ createTableSchema(TSDataType.DOUBLE)));
+
+ Assert.assertTrue(exception.getCause() instanceof
LoadAnalyzeTypeMismatchException);
+ } finally {
+ schemaCache.close();
+ }
+ }
+
+ @Test
+ public void
testTableSchemaCacheShouldNotThrowMismatchWhenSkippingDataTypeVerification()
+ throws Exception {
+ final LoadTsFileTableSchemaCache schemaCache =
createTableSchemaCache(false);
+ try {
+ getVerifyTableDataTypeMethod()
+ .invoke(
+ schemaCache,
+ createTableSchema(TSDataType.INT64),
+ createTableSchema(TSDataType.DOUBLE));
+ } finally {
+ schemaCache.close();
+ }
+ }
+
private void writeTableTsFileWithMixedDevices(final File tsFile) throws
Exception {
if (tsFile.exists()) {
Assert.assertTrue(tsFile.delete());
@@ -163,6 +203,33 @@ public class LoadTsFileAnalyzerTest {
tableSchemaCacheField.set(analyzer, schemaCache);
}
+ private LoadTsFileTableSchemaCache createTableSchemaCache(final boolean
shouldVerifyDataType)
+ throws LoadRuntimeOutOfMemoryException {
+ return new LoadTsFileTableSchemaCache(
+ null, new MPPQueryContext(new QueryId("load_test")), false,
shouldVerifyDataType);
+ }
+
+ private Method getVerifyTableDataTypeMethod() throws NoSuchMethodException {
+ final Method method =
+ LoadTsFileTableSchemaCache.class.getDeclaredMethod(
+ "verifyTableDataTypeAndGenerateTagColumnMapper",
+
org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema.class,
+
org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema.class);
+ method.setAccessible(true);
+ return method;
+ }
+
+ private
org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema
+ createTableSchema(final TSDataType fieldType) {
+ return new
org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema(
+ "table1",
+ Arrays.asList(
+ new ColumnSchema(
+ "tag1", TypeFactory.getType(TSDataType.STRING), false,
TsTableColumnCategory.TAG),
+ new ColumnSchema(
+ "s1", TypeFactory.getType(fieldType), false,
TsTableColumnCategory.FIELD)));
+ }
+
private boolean containsDevice(final Set<IDeviceID> devices, final String...
expectedSegments) {
return devices.stream()
.anyMatch(device -> Arrays.equals(device.getSegments(),
expectedSegments));
@@ -173,7 +240,7 @@ public class LoadTsFileAnalyzerTest {
private final Set<List<Object>> verifiedDevices = new HashSet<>();
private TrackingLoadTsFileTableSchemaCache() throws
LoadRuntimeOutOfMemoryException {
- super(null, new MPPQueryContext(new QueryId("load_test")), false);
+ super(null, new MPPQueryContext(new QueryId("load_test")), false, true);
}
@Override