This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 90055d55b6e Fix pipe receiver type conversion load path (#17849)
90055d55b6e is described below

commit 90055d55b6e1166580824e5cbed77a7c253ef514
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 11:20:37 2026 +0800

    Fix pipe receiver type conversion load path (#17849)
    
    * Add pipe type conversion semantic ITs
    
    * Fix float-to-date semantic IT expectation
    
    * Add aligned stream pipe type conversion IT
    
    * Fix pipe table type conversion load path
    
    * Fix alter type IT pipe memory flakiness
---
 .../db/it/schema/IoTDBAlterTimeSeriesTypeIT.java   |   7 +-
 .../pipe/it/dual/TypeConversionSemanticCase.java   | 315 +++++++++++++++++++++
 .../IoTDBPipeTypeConversionSemanticIT.java         | 255 +++++++++++++++++
 .../IoTDBPipeTypeConversionSemanticIT.java         | 259 +++++++++++++++++
 .../it/schema/IoTDBAlterColumnTypeIT.java          |   9 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  44 ++-
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |   3 +-
 .../analyze/load/LoadTsFileTableSchemaCache.java   |  31 +-
 .../protocol/thrift/IoTDBDataNodeReceiverTest.java |  43 ++-
 .../plan/analyze/load/LoadTsFileAnalyzerTest.java  |  69 ++++-
 10 files changed, 1003 insertions(+), 32 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
index e5fa5ca59a7..fcde3f61e37 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
@@ -98,6 +98,7 @@ public class IoTDBAlterTimeSeriesTypeIT {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
     
EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleInterval(1000);
     EnvFactory.getEnv().initClusterEnvironment();
   }
@@ -878,11 +879,7 @@ public class IoTDBAlterTimeSeriesTypeIT {
           session.executeQueryStatement("select count(s1) from " + database + 
".load_and_alter");
       RowRecord rec;
       rec = dataSet.next();
-      // Due to the operation of load tsfile execute directly, don't access 
memtable or generate
-      // InsertNode object, so don't need to check the data type.
-      // When query this measurement point, will only find the data of 
TSDataType.INT32. So this is
-      // reason what cause we can't find the data of TSDataType.DOUBLE. So 
result is 9, is not 15.
-      //      assertEquals(15, rec.getFields().get(0).getLongV());
+      // Before alter, DOUBLE TsFiles loaded directly are invisible under the 
existing INT32 schema.
       assertEquals(9, rec.getFields().get(0).getLongV());
       assertFalse(dataSet.hasNext());
     }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java
new file mode 100644
index 00000000000..3ece5cf865e
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/TypeConversionSemanticCase.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual;
+
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BytesUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.List;
+
+public class TypeConversionSemanticCase {
+
+  public static final int ROW_COUNT = 3;
+
+  public static final List<TypeConversionSemanticCase> CASES =
+      Arrays.asList(
+          c(
+              "bool_to_int32",
+              TSDataType.BOOLEAN,
+              TSDataType.INT32,
+              values("true", "false", "true"),
+              values("1", "0", "1")),
+          c(
+              "bool_to_int64",
+              TSDataType.BOOLEAN,
+              TSDataType.INT64,
+              values("true", "false", "true"),
+              values("1", "0", "1")),
+          c(
+              "bool_to_float",
+              TSDataType.BOOLEAN,
+              TSDataType.FLOAT,
+              values("true", "false", "true"),
+              values("1.0", "0.0", "1.0")),
+          c(
+              "bool_to_double",
+              TSDataType.BOOLEAN,
+              TSDataType.DOUBLE,
+              values("true", "false", "true"),
+              values("1.0", "0.0", "1.0")),
+          c(
+              "bool_to_text",
+              TSDataType.BOOLEAN,
+              TSDataType.TEXT,
+              values("true", "false", "true"),
+              values("true", "false", "true")),
+          c(
+              "bool_to_blob",
+              TSDataType.BOOLEAN,
+              TSDataType.BLOB,
+              values("true", "false", "true"),
+              values(blobValue("true"), blobValue("false"), 
blobValue("true"))),
+          c(
+              "bool_to_string",
+              TSDataType.BOOLEAN,
+              TSDataType.STRING,
+              values("true", "false", "true"),
+              values("true", "false", "true")),
+          c(
+              "bool_to_date",
+              TSDataType.BOOLEAN,
+              TSDataType.DATE,
+              values("true", "false", "true"),
+              values("1970-01-02", "1970-01-01", "1970-01-02")),
+          c(
+              "bool_to_timestamp",
+              TSDataType.BOOLEAN,
+              TSDataType.TIMESTAMP,
+              values("true", "false", "true"),
+              values(timestampValue(1), timestampValue(0), timestampValue(1))),
+          c(
+              "int32_to_boolean",
+              TSDataType.INT32,
+              TSDataType.BOOLEAN,
+              values("0", "2", "-1"),
+              values("false", "true", "true")),
+          c(
+              "int32_to_timestamp",
+              TSDataType.INT32,
+              TSDataType.TIMESTAMP,
+              values("0", "1", "86400000"),
+              values(timestampValue(0), timestampValue(1), 
timestampValue(86400000))),
+          c(
+              "int32_to_date",
+              TSDataType.INT32,
+              TSDataType.DATE,
+              values("19700102", "20240229", "42"),
+              values("1970-01-02", "2024-02-29", "1970-01-01")),
+          c(
+              "int64_to_int32",
+              TSDataType.INT64,
+              TSDataType.INT32,
+              values("2147483648", "-2147483649", "42"),
+              values("-2147483648", "2147483647", "42")),
+          c(
+              "int64_to_boolean",
+              TSDataType.INT64,
+              TSDataType.BOOLEAN,
+              values("0", "2", "-1"),
+              values("false", "true", "true")),
+          c(
+              "int64_to_date",
+              TSDataType.INT64,
+              TSDataType.DATE,
+              values("19700102", "2147483648", "19700103"),
+              values("1970-01-02", "1970-01-01", "1970-01-03")),
+          c(
+              "float_to_int32",
+              TSDataType.FLOAT,
+              TSDataType.INT32,
+              values("2.9", "-2.9", "0.0"),
+              values("2", "-2", "0")),
+          c(
+              "float_to_boolean",
+              TSDataType.FLOAT,
+              TSDataType.BOOLEAN,
+              values("0.0", "0.1", "-0.1"),
+              values("false", "true", "true")),
+          c(
+              "float_to_date",
+              TSDataType.FLOAT,
+              TSDataType.DATE,
+              values("19700102.0", "19700104.0", "42.9"),
+              values("1970-01-02", "1970-01-04", "1970-01-01")),
+          c(
+              "double_to_int64",
+              TSDataType.DOUBLE,
+              TSDataType.INT64,
+              values("3.9", "-3.9", "0.0"),
+              values("3", "-3", "0")),
+          c(
+              "double_to_boolean",
+              TSDataType.DOUBLE,
+              TSDataType.BOOLEAN,
+              values("0.0", "0.1", "-0.1"),
+              values("false", "true", "true")),
+          c(
+              "double_to_timestamp",
+              TSDataType.DOUBLE,
+              TSDataType.TIMESTAMP,
+              values("1.9", "86400000.9", "0.0"),
+              values(timestampValue(1), timestampValue(86400000), 
timestampValue(0))),
+          c(
+              "text_to_int32",
+              TSDataType.TEXT,
+              TSDataType.INT32,
+              values("'123.9'", "'bad'", "'-123.9'"),
+              values("123", "0", "-123")),
+          c(
+              "string_to_int64",
+              TSDataType.STRING,
+              TSDataType.INT64,
+              values("'456.9'", "'bad'", "'-456.9'"),
+              values("456", "0", "-456")),
+          c(
+              "blob_to_float",
+              TSDataType.BLOB,
+              TSDataType.FLOAT,
+              values(blobSql("7.5"), blobSql("bad"), blobSql("-7.5")),
+              values("7.5", "0.0", "-7.5")),
+          c(
+              "text_to_double",
+              TSDataType.TEXT,
+              TSDataType.DOUBLE,
+              values("'8.25'", "'bad'", "'-8.25'"),
+              values("8.25", "0.0", "-8.25")),
+          c(
+              "text_to_boolean",
+              TSDataType.TEXT,
+              TSDataType.BOOLEAN,
+              values("'true'", "'1'", "'TrUe'"),
+              values("true", "false", "true")),
+          c(
+              "string_to_boolean",
+              TSDataType.STRING,
+              TSDataType.BOOLEAN,
+              values("'TRUE'", "'false'", "'yes'"),
+              values("true", "false", "false")),
+          c(
+              "blob_to_boolean",
+              TSDataType.BLOB,
+              TSDataType.BOOLEAN,
+              values(blobSql("true"), blobSql("FALSE"), blobSql("0")),
+              values("true", "false", "false")),
+          c(
+              "text_to_timestamp",
+              TSDataType.TEXT,
+              TSDataType.TIMESTAMP,
+              values("'86400000'", "'1970-01-02T00:00:00.000'", "'bad'"),
+              values(timestampValue(86400000), timestampValue(86400000), 
timestampValue(0))),
+          c(
+              "string_to_timestamp",
+              TSDataType.STRING,
+              TSDataType.TIMESTAMP,
+              values("'1970-01-03T00:00:00.000'", "'bad'", "'86400000'"),
+              values(timestampValue(172800000), timestampValue(0), 
timestampValue(86400000))),
+          c(
+              "blob_to_timestamp",
+              TSDataType.BLOB,
+              TSDataType.TIMESTAMP,
+              values(blobSql("bad"), blobSql("1"), 
blobSql("1970-01-02T00:00:00.000")),
+              values(timestampValue(0), timestampValue(1), 
timestampValue(86400000))),
+          c(
+              "text_to_date",
+              TSDataType.TEXT,
+              TSDataType.DATE,
+              values("'19700102'", "'1970-01-04'", "'bad'"),
+              values("1970-01-02", "1970-01-04", "1970-01-01")),
+          c(
+              "string_to_date",
+              TSDataType.STRING,
+              TSDataType.DATE,
+              values("'1970-01-03'", "'19700105'", "'1970-01-07'"),
+              values("1970-01-03", "1970-01-05", "1970-01-07")),
+          c(
+              "blob_to_date",
+              TSDataType.BLOB,
+              TSDataType.DATE,
+              values(blobSql("bad"), blobSql("1970-01-06"), 
blobSql("19700108")),
+              values("1970-01-01", "1970-01-06", "1970-01-08")),
+          c(
+              "timestamp_to_date",
+              TSDataType.TIMESTAMP,
+              TSDataType.DATE,
+              values("0", "86399999", "86400000"),
+              values("1970-01-01", "1970-01-01", "1970-01-02")),
+          c(
+              "date_to_timestamp",
+              TSDataType.DATE,
+              TSDataType.TIMESTAMP,
+              values("'1970-01-01'", "'1970-01-02'", "'1970-01-03'"),
+              values(timestampValue(0), timestampValue(86400000), 
timestampValue(172800000))),
+          c(
+              "timestamp_to_boolean",
+              TSDataType.TIMESTAMP,
+              TSDataType.BOOLEAN,
+              values("0", "-1", "1"),
+              values("false", "true", "true")),
+          c(
+              "date_to_boolean",
+              TSDataType.DATE,
+              TSDataType.BOOLEAN,
+              values("'1970-01-01'", "'1970-01-02'", "'1969-12-31'"),
+              values("false", "true", "true")));
+
+  public final String measurement;
+  public final TSDataType sourceType;
+  public final TSDataType targetType;
+  public final String[] sourceSqlValues;
+  public final String[] expectedValues;
+
+  private TypeConversionSemanticCase(
+      final String measurement,
+      final TSDataType sourceType,
+      final TSDataType targetType,
+      final String[] sourceSqlValues,
+      final String[] expectedValues) {
+    this.measurement = measurement;
+    this.sourceType = sourceType;
+    this.targetType = targetType;
+    this.sourceSqlValues = sourceSqlValues;
+    this.expectedValues = expectedValues;
+  }
+
+  private static TypeConversionSemanticCase c(
+      final String measurement,
+      final TSDataType sourceType,
+      final TSDataType targetType,
+      final String[] sourceSqlValues,
+      final String[] expectedValues) {
+    return new TypeConversionSemanticCase(
+        measurement, sourceType, targetType, sourceSqlValues, expectedValues);
+  }
+
+  private static String[] values(final String... values) {
+    return values;
+  }
+
+  public static String timestampValue(final long timestamp) {
+    return RpcUtils.formatDatetime("default", "ms", timestamp, ZoneOffset.UTC);
+  }
+
+  private static String blobSql(final String value) {
+    final StringBuilder builder = new StringBuilder("X'");
+    for (final byte b : value.getBytes(StandardCharsets.UTF_8)) {
+      builder.append(String.format("%02x", b & 0xFF));
+    }
+    return builder.append("'").toString();
+  }
+
+  private static String blobValue(final String value) {
+    return 
BytesUtils.parseBlobByteArrayToString(value.getBytes(StandardCharsets.UTF_8));
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java
new file mode 100644
index 00000000000..6704d37b87e
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionSemanticIT.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.it.dual.TypeConversionSemanticCase;
+import 
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTableManualEnhanced.class})
+public class IoTDBPipeTypeConversionSemanticIT extends 
AbstractPipeTableModelDualManualIT {
+
+  private static final String DATABASE = "pipe_type_conversion";
+  private static final String TABLE = "semantic_conversion";
+  private static final String STREAM_TABLE = "semantic_stream_conversion";
+  private static final List<TypeConversionSemanticCase> STREAM_CASES =
+      getCases(
+          "bool_to_int32",
+          "bool_to_int64",
+          "bool_to_float",
+          "bool_to_double",
+          "bool_to_blob",
+          "bool_to_date",
+          "bool_to_timestamp",
+          "int32_to_boolean",
+          "int32_to_timestamp",
+          "int32_to_date");
+
+  @Override
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+    setupConfig();
+    senderEnv.initClusterEnvironment(1, 1);
+    receiverEnv.initClusterEnvironment(1, 1);
+  }
+
+  @Override
+  protected void setupConfig() {
+    super.setupConfig();
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(1)
+        .setSchemaReplicationFactor(1);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(1)
+        .setSchemaReplicationFactor(1);
+  }
+
+  @Test
+  public void testPipeReceiverTypeConversionSemantics() {
+    createDatabaseAndTable(senderEnv, TABLE, TypeConversionSemanticCase.CASES, 
true);
+    createDatabaseAndTable(receiverEnv, TABLE, 
TypeConversionSemanticCase.CASES, false);
+    createPipe();
+
+    TestUtils.executeNonQueries(
+        DATABASE,
+        BaseEnv.TABLE_SQL_DIALECT,
+        senderEnv,
+        createInsertStatements(TABLE, TypeConversionSemanticCase.CASES),
+        null);
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        createQuerySql(TABLE, TypeConversionSemanticCase.CASES),
+        createExpectedHeader(TypeConversionSemanticCase.CASES),
+        new HashSet<>(createExpectedRows(TypeConversionSemanticCase.CASES)),
+        60,
+        DATABASE,
+        null);
+  }
+
+  @Test
+  public void testStreamPipeReceiverTypeConversionSemantics() {
+    createDatabaseAndTable(senderEnv, STREAM_TABLE, STREAM_CASES, true);
+    createDatabaseAndTable(receiverEnv, STREAM_TABLE, STREAM_CASES, false);
+    createStreamPipe();
+
+    TestUtils.executeNonQueries(
+        DATABASE,
+        BaseEnv.TABLE_SQL_DIALECT,
+        senderEnv,
+        createInsertStatements(STREAM_TABLE, STREAM_CASES),
+        null);
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        createQuerySql(STREAM_TABLE, STREAM_CASES),
+        createExpectedHeader(STREAM_CASES),
+        new HashSet<>(createExpectedRows(STREAM_CASES)),
+        60,
+        DATABASE,
+        null);
+  }
+
+  private static void createDatabaseAndTable(
+      final BaseEnv env,
+      final String table,
+      final List<TypeConversionSemanticCase> conversionCases,
+      final boolean useSourceType) {
+    final List<String> sqls = new ArrayList<>();
+    sqls.add("create database if not exists " + DATABASE);
+    sqls.add("use " + DATABASE);
+    final List<String> columns = new ArrayList<>();
+    columns.add("tag_id string tag");
+    for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+      columns.add(
+          String.format(
+              "%s %s field",
+              conversionCase.measurement,
+              useSourceType ? conversionCase.sourceType : 
conversionCase.targetType));
+    }
+    sqls.add(String.format("create table %s (%s)", table, String.join(",", 
columns)));
+    TestUtils.executeNonQueries(null, BaseEnv.TABLE_SQL_DIALECT, env, sqls, 
null);
+  }
+
+  private void createPipe() {
+    TestUtils.executeNonQuery(
+        DATABASE,
+        BaseEnv.TABLE_SQL_DIALECT,
+        senderEnv,
+        String.format(
+            "create pipe type_conversion_semantic"
+                + " with source 
('source'='iotdb-source','history.enable'='false','realtime.enable'='true','realtime.mode'='forced-log')"
+                + " with processor ('processor'='do-nothing-processor')"
+                + " with sink 
('node-urls'='%s','batch.enable'='false','sink.format'='tablet')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()),
+        null);
+  }
+
+  private void createStreamPipe() {
+    TestUtils.executeNonQuery(
+        DATABASE,
+        BaseEnv.TABLE_SQL_DIALECT,
+        senderEnv,
+        String.format(
+            "create pipe stream_type_conversion_semantic"
+                + " with source 
('source'='iotdb-source','history.enable'='false','realtime.enable'='true','realtime.mode'='stream')"
+                + " with processor ('processor'='do-nothing-processor')"
+                + " with sink 
('sink'='iotdb-thrift-sink','sink.node-urls'='%s')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()),
+        null);
+  }
+
+  private static List<String> createInsertStatements(
+      final String table, final List<TypeConversionSemanticCase> 
conversionCases) {
+    final List<String> sqls = new ArrayList<>();
+    final String measurements =
+        String.join(
+            ",",
+            conversionCases.stream()
+                .map(conversionCase -> conversionCase.measurement)
+                .toArray(String[]::new));
+    for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) {
+      final List<String> values = new ArrayList<>();
+      for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+        values.add(conversionCase.sourceSqlValues[row]);
+      }
+      sqls.add(
+          String.format(
+              "insert into %s(time,tag_id,%s) values (%d,'d',%s)",
+              table, measurements, row + 1, String.join(",", values)));
+    }
+    sqls.add("flush");
+    return sqls;
+  }
+
+  private static String createQuerySql(
+      final String table, final List<TypeConversionSemanticCase> 
conversionCases) {
+    return String.format(
+        "select %s,time from %s where tag_id='d'",
+        String.join(
+            ",",
+            conversionCases.stream()
+                .map(conversionCase -> conversionCase.measurement)
+                .toArray(String[]::new)),
+        table);
+  }
+
+  private static String createExpectedHeader(
+      final List<TypeConversionSemanticCase> conversionCases) {
+    final List<String> columns = new ArrayList<>();
+    for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+      columns.add(conversionCase.measurement);
+    }
+    columns.add("time");
+    return String.join(",", columns) + ",";
+  }
+
+  private static List<String> createExpectedRows(
+      final List<TypeConversionSemanticCase> conversionCases) {
+    final List<String> rows = new ArrayList<>();
+    for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) {
+      final List<String> values = new ArrayList<>();
+      for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+        values.add(conversionCase.expectedValues[row]);
+      }
+      values.add(TypeConversionSemanticCase.timestampValue(row + 1));
+      rows.add(String.join(",", values) + ",");
+    }
+    return rows;
+  }
+
+  private static List<TypeConversionSemanticCase> getCases(final String... 
measurements) {
+    final List<TypeConversionSemanticCase> cases = new ArrayList<>();
+    for (final String measurement : measurements) {
+      cases.add(getCase(measurement));
+    }
+    return cases;
+  }
+
+  private static TypeConversionSemanticCase getCase(final String measurement) {
+    for (final TypeConversionSemanticCase conversionCase : 
TypeConversionSemanticCase.CASES) {
+      if (conversionCase.measurement.equals(measurement)) {
+        return conversionCase;
+      }
+    }
+    throw new IllegalArgumentException("Unknown type conversion semantic case: 
" + measurement);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java
new file mode 100644
index 00000000000..b2218003379
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeTypeConversionSemanticIT.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.treemodel.auto.enhanced;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.it.dual.TypeConversionSemanticCase;
+import 
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTreeAutoEnhanced.class})
+public class IoTDBPipeTypeConversionSemanticIT extends 
AbstractPipeDualTreeModelAutoIT {
+
+  private static final String DEVICE = "root.pipe_type_conversion.d";
+  private static final String ALIGNED_DEVICE = 
"root.pipe_type_conversion.aligned_d";
+  private static final List<TypeConversionSemanticCase> ALIGNED_STREAM_CASES =
+      getCases(
+          "bool_to_int32",
+          "bool_to_int64",
+          "bool_to_float",
+          "bool_to_double",
+          "bool_to_blob",
+          "bool_to_date",
+          "bool_to_timestamp",
+          "int32_to_boolean",
+          "int32_to_timestamp",
+          "int32_to_date");
+
+  @Override
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+    setupConfig();
+    senderEnv.initClusterEnvironment(1, 1);
+    receiverEnv.initClusterEnvironment(1, 1);
+  }
+
+  @Override
+  protected void setupConfig() {
+    super.setupConfig();
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(1)
+        .setSchemaReplicationFactor(1);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(1)
+        .setSchemaReplicationFactor(1);
+  }
+
+  @Test
+  public void testPipeReceiverTypeConversionSemantics() {
+    createTimeseries(senderEnv, DEVICE, TypeConversionSemanticCase.CASES, 
true);
+    createTimeseries(receiverEnv, DEVICE, TypeConversionSemanticCase.CASES, 
false);
+    createPipe();
+
+    TestUtils.executeNonQueries(
+        senderEnv, createInsertStatements(DEVICE, 
TypeConversionSemanticCase.CASES, false), null);
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        createQuerySql(DEVICE, TypeConversionSemanticCase.CASES),
+        createExpectedHeader(DEVICE, TypeConversionSemanticCase.CASES),
+        new HashSet<>(createExpectedRows(TypeConversionSemanticCase.CASES)),
+        60);
+  }
+
+  @Test
+  public void testAlignedStreamPipeReceiverTypeConversionSemantics() {
+    createAlignedTimeseries(senderEnv, ALIGNED_DEVICE, ALIGNED_STREAM_CASES, 
true);
+    createAlignedTimeseries(receiverEnv, ALIGNED_DEVICE, ALIGNED_STREAM_CASES, 
false);
+    createStreamPipe(ALIGNED_DEVICE);
+
+    TestUtils.executeNonQueries(
+        senderEnv, createInsertStatements(ALIGNED_DEVICE, 
ALIGNED_STREAM_CASES, true), null);
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        createQuerySql(ALIGNED_DEVICE, ALIGNED_STREAM_CASES),
+        createExpectedHeader(ALIGNED_DEVICE, ALIGNED_STREAM_CASES),
+        new HashSet<>(createExpectedRows(ALIGNED_STREAM_CASES)),
+        60);
+  }
+
+  private static void createTimeseries(
+      final BaseEnv env,
+      final String device,
+      final List<TypeConversionSemanticCase> conversionCases,
+      final boolean useSourceType) {
+    final List<String> sqls = new ArrayList<>();
+    for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+      sqls.add(
+          String.format(
+              "create timeseries %s.%s with datatype=%s,encoding=PLAIN",
+              device,
+              conversionCase.measurement,
+              useSourceType ? conversionCase.sourceType : 
conversionCase.targetType));
+    }
+    TestUtils.executeNonQueries(env, sqls, null);
+  }
+
+  private static void createAlignedTimeseries(
+      final BaseEnv env,
+      final String device,
+      final List<TypeConversionSemanticCase> conversionCases,
+      final boolean useSourceType) {
+    final List<String> measurements = new ArrayList<>();
+    for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+      measurements.add(
+          String.format(
+              "%s %s encoding=PLAIN",
+              conversionCase.measurement,
+              useSourceType ? conversionCase.sourceType : 
conversionCase.targetType));
+    }
+    TestUtils.executeNonQuery(
+        env,
+        String.format("create aligned timeseries %s(%s)", device, 
String.join(",", measurements)),
+        null);
+  }
+
+  private void createPipe() {
+    TestUtils.executeNonQuery(
+        senderEnv,
+        String.format(
+            "create pipe type_conversion_semantic"
+                + " with source 
('source'='iotdb-source','source.path'='%s.**','history.enable'='false','realtime.mode'='forced-log')"
+                + " with processor ('processor'='do-nothing-processor')"
+                + " with sink 
('node-urls'='%s','batch.enable'='false','sink.format'='tablet')",
+            DEVICE, 
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()),
+        null);
+  }
+
+  private void createStreamPipe(final String device) {
+    TestUtils.executeNonQuery(
+        senderEnv,
+        String.format(
+            "create pipe aligned_type_conversion_semantic"
+                + " with source 
('source'='iotdb-source','source.path'='%s.**','history.enable'='false','realtime.mode'='stream')"
+                + " with processor ('processor'='do-nothing-processor')"
+                + " with sink 
('sink'='iotdb-thrift-sink','sink.node-urls'='%s')",
+            device, 
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()),
+        null);
+  }
+
+  private static List<String> createInsertStatements(
+      final String device,
+      final List<TypeConversionSemanticCase> conversionCases,
+      final boolean isAligned) {
+    final List<String> sqls = new ArrayList<>();
+    final String measurements =
+        String.join(
+            ",",
+            conversionCases.stream()
+                .map(conversionCase -> conversionCase.measurement)
+                .toArray(String[]::new));
+    for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) {
+      final List<String> values = new ArrayList<>();
+      for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+        values.add(conversionCase.sourceSqlValues[row]);
+      }
+      sqls.add(
+          String.format(
+              "insert into %s(time,%s)%s values (%d,%s)",
+              device,
+              measurements,
+              isAligned ? " aligned" : "",
+              row + 1,
+              String.join(",", values)));
+    }
+    sqls.add("flush");
+    return sqls;
+  }
+
+  private static String createQuerySql(
+      final String device, final List<TypeConversionSemanticCase> 
conversionCases) {
+    return String.format(
+        "select %s from %s",
+        String.join(
+            ",",
+            conversionCases.stream()
+                .map(conversionCase -> conversionCase.measurement)
+                .toArray(String[]::new)),
+        device);
+  }
+
+  private static String createExpectedHeader(
+      final String device, final List<TypeConversionSemanticCase> 
conversionCases) {
+    final List<String> columns = new ArrayList<>();
+    columns.add("Time");
+    for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+      columns.add(device + "." + conversionCase.measurement);
+    }
+    return String.join(",", columns) + ",";
+  }
+
+  private static List<String> createExpectedRows(
+      final List<TypeConversionSemanticCase> conversionCases) {
+    final List<String> rows = new ArrayList<>();
+    for (int row = 0; row < TypeConversionSemanticCase.ROW_COUNT; row++) {
+      final List<String> values = new ArrayList<>();
+      values.add(Integer.toString(row + 1));
+      for (final TypeConversionSemanticCase conversionCase : conversionCases) {
+        values.add(conversionCase.expectedValues[row]);
+      }
+      rows.add(String.join(",", values) + ",");
+    }
+    return rows;
+  }
+
+  private static List<TypeConversionSemanticCase> getCases(final String... 
measurements) {
+    final List<TypeConversionSemanticCase> cases = new ArrayList<>();
+    for (final String measurement : measurements) {
+      cases.add(getCase(measurement));
+    }
+    return cases;
+  }
+
+  private static TypeConversionSemanticCase getCase(final String measurement) {
+    for (final TypeConversionSemanticCase conversionCase : 
TypeConversionSemanticCase.CASES) {
+      if (conversionCase.measurement.equals(measurement)) {
+        return conversionCase;
+      }
+    }
+    throw new IllegalArgumentException("Unknown type conversion semantic case: 
" + measurement);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
index 0903a0a3a0d..f04b6682962 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
@@ -99,6 +99,7 @@ public class IoTDBAlterColumnTypeIT {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
     
EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleInterval(1000);
     EnvFactory.getEnv().initClusterEnvironment();
     try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
@@ -936,12 +937,8 @@ public class IoTDBAlterColumnTypeIT {
           session.executeQueryStatement("select count(s1) from 
load_and_alter");
       RowRecord rec;
       rec = dataSet.next();
-      // Due to the operation of load tsfile execute directly, don't access 
memtable or generate
-      // InsertNode object, so don't need to check the data type.
-      // When query this measurement point, will only find the data of 
TSDataType.INT32. So this is
-      // reason what cause we can't find the data of TSDataType.DOUBLE. So 
result is 9, is not 18.
-      //      assertEquals(18, rec.getFields().get(0).getLongV());
-      assertEquals(9, rec.getFields().get(0).getLongV());
+      // LOAD converts TsFiles with mismatched types into tablets by default.
+      assertEquals(18, rec.getFields().get(0).getLongV());
       assertFalse(dataSet.hasNext());
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 731c2eb50f0..e6c8ddefc99 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -590,7 +590,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         dataBaseName,
         LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName),
         shouldConvertDataTypeOnTypeMismatch,
-        validateTsFile,
+        validateTsFile || shouldConvertDataTypeOnTypeMismatch,
         null,
         shouldMarkAsPipeRequest);
   }
@@ -598,16 +598,23 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus loadTsFileSync(final String dataBaseName, final String 
fileAbsolutePath)
       throws FileNotFoundException {
     return executeStatementAndClassifyExceptions(
-        buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath, 
validateTsFile.get()));
+        buildLoadTsFileStatementForSync(
+            dataBaseName,
+            fileAbsolutePath,
+            validateTsFile.get(),
+            shouldConvertDataTypeOnTypeMismatch));
   }
 
   static LoadTsFileStatement buildLoadTsFileStatementForSync(
-      final String dataBaseName, final String fileAbsolutePath, final boolean 
validateTsFile)
+      final String dataBaseName,
+      final String fileAbsolutePath,
+      final boolean validateTsFile,
+      final boolean shouldConvertDataTypeOnTypeMismatch)
       throws FileNotFoundException {
     final LoadTsFileStatement statement = 
LoadTsFileStatement.createUnchecked(fileAbsolutePath);
     statement.setDeleteAfterLoad(true);
-    statement.setConvertOnTypeMismatch(true);
-    statement.setVerifySchema(validateTsFile);
+    statement.setConvertOnTypeMismatch(shouldConvertDataTypeOnTypeMismatch);
+    statement.setVerifySchema(validateTsFile || 
shouldConvertDataTypeOnTypeMismatch);
     statement.setAutoCreateDatabase(
         IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
     statement.setDatabase(dataBaseName);
@@ -956,14 +963,26 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       }
     }
 
+    // Execute insert statements through the conversion wrapper first to avoid 
writing a partial
+    // row/tablet before the type mismatch is converted.
+    if (shouldConvertDataTypeOnTypeMismatch && statement instanceof 
InsertBaseStatement) {
+      final Optional<TSStatus> convertedStatus =
+          executeInsertStatementWithDataTypeConversion(
+              statement, isTableModelStatement, databaseName);
+      if (convertedStatus.isPresent()) {
+        return convertedStatus.get();
+      }
+    }
+
     // Real execution of the statement
     final TSStatus status =
         isTableModelStatement
             ? executeStatementForTableModel(statement, databaseName)
             : executeStatementForTreeModel(statement);
 
-    // Try to convert data type if the statement is a tree model statement
-    // and the status code is not success
+    // Try to convert data type if the status code is not success. Insert 
statements normally return
+    // above after the first converted execution. The retry path is kept for 
load and fallback
+    // cases.
     if (!shouldConvertDataTypeOnTypeMismatch
         || !((statement instanceof InsertBaseStatement
                 && ((InsertBaseStatement) statement).hasFailedMeasurements())
@@ -986,6 +1005,17 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         : statement.accept(treeStatementDataTypeConvertExecutionVisitor, 
status).orElse(status);
   }
 
+  private Optional<TSStatus> executeInsertStatementWithDataTypeConversion(
+      final Statement statement, final boolean isTableModelStatement, final 
String databaseName) {
+    return isTableModelStatement
+        ? statement.accept(
+            tableStatementDataTypeConvertExecutionVisitor,
+            new Pair<>(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), databaseName))
+        : statement.accept(
+            treeStatementDataTypeConvertExecutionVisitor,
+            new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+  }
+
   private boolean shouldUseTableModelVisitorForLoadStatement(
       final LoadTsFileStatement loadTsFileStatement) {
     final List<Boolean> isTableModel = loadTsFileStatement.getIsTableModel();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 5171ccd93c3..e357b86e0c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -619,7 +619,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
   private LoadTsFileTableSchemaCache getOrCreateTableSchemaCache() {
     if (tableSchemaCache == null) {
-      tableSchemaCache = new LoadTsFileTableSchemaCache(metadata, context, 
isAutoCreateDatabase);
+      tableSchemaCache =
+          new LoadTsFileTableSchemaCache(metadata, context, 
isAutoCreateDatabase, isVerifySchema);
     }
     return tableSchemaCache;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index c3c0ea73310..92a56d29a55 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -98,6 +99,7 @@ public class LoadTsFileTableSchemaCache {
   private Map<String, org.apache.tsfile.file.metadata.TableSchema> 
tableSchemaMap;
   private final Metadata metadata;
   private final MPPQueryContext context;
+  private final boolean shouldVerifyDataType;
 
   private Map<String, Set<IDeviceID>> currentBatchTable2Devices;
 
@@ -116,13 +118,17 @@ public class LoadTsFileTableSchemaCache {
   private final AtomicBoolean needDecode4DifferentTimeColumn = new 
AtomicBoolean(false);
 
   public LoadTsFileTableSchemaCache(
-      final Metadata metadata, final MPPQueryContext context, final boolean 
needToCreateDatabase)
+      final Metadata metadata,
+      final MPPQueryContext context,
+      final boolean needToCreateDatabase,
+      final boolean shouldVerifyDataType)
       throws LoadRuntimeOutOfMemoryException {
     this.block =
         LoadTsFileMemoryManager.getInstance()
             .allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
     this.metadata = metadata;
     this.context = context;
+    this.shouldVerifyDataType = shouldVerifyDataType;
     this.currentBatchTable2Devices = new HashMap<>();
     this.currentModifications = PatternTreeMapFactory.getModsPatternTreeMap();
     this.needToCreateDatabase = needToCreateDatabase;
@@ -389,14 +395,25 @@ public class LoadTsFileTableSchemaCache {
         }
       } else if (fileColumn.getColumnCategory() == 
TsTableColumnCategory.FIELD) {
         ColumnSchema realColumn = 
fieldColumnNameToSchema.get(fileColumn.getName());
-        if (LOGGER.isDebugEnabled()
-            && (realColumn == null || 
!fileColumn.getType().equals(realColumn.getType()))) {
+        if (realColumn != null && 
!fileColumn.getType().equals(realColumn.getType())) {
+          final String message =
+              String.format(
+                  "Data type mismatch for column %s in table %s, type in 
TsFile: %s, type in IoTDB: %s",
+                  fileColumn.getName(),
+                  realSchema.getTableName(),
+                  fileColumn.getType(),
+                  realColumn.getType());
+          if (shouldVerifyDataType) {
+            throw new LoadAnalyzeTypeMismatchException(message);
+          }
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(message);
+          }
+        } else if (LOGGER.isDebugEnabled() && realColumn == null) {
           LOGGER.debug(
-              "Data type mismatch for column {} in table {}, type in TsFile: 
{}, type in IoTDB: {}",
+              "Column {} in table {} is not found in IoTDB while loading 
TsFile.",
               fileColumn.getName(),
-              realSchema.getTableName(),
-              fileColumn.getType(),
-              Objects.nonNull(realColumn) ? realColumn.getType() : null);
+              realSchema.getTableName());
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
index f41c44763f9..1e279a18feb 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -39,7 +39,7 @@ public class IoTDBDataNodeReceiverTest {
     try {
       final LoadTsFileStatement statement =
           IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
-              "root.test.sg_0", tsFile.toString(), true);
+              "root.test.sg_0", tsFile.toString(), true, true);
 
       Assert.assertEquals("root.test.sg_0", statement.getDatabase());
       Assert.assertEquals(2, statement.getDatabaseLevel());
@@ -54,16 +54,17 @@ public class IoTDBDataNodeReceiverTest {
     try {
       final Map<String, String> attributes =
           IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync(
-              "root.test.sg_0", true, true, true);
+              "root.test.sg_0", true, false, true);
 
       Assert.assertEquals(
           "root.test.sg_0", 
attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY));
       Assert.assertEquals("2", 
attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY));
 
       final LoadTsFileStatement statement = 
LoadTsFileStatement.createUnchecked(tsFile.toString());
-      ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, 
true);
+      ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, 
false);
       Assert.assertEquals("root.test.sg_0", statement.getDatabase());
       Assert.assertEquals(2, statement.getDatabaseLevel());
+      Assert.assertTrue(statement.isVerifySchema());
     } finally {
       Files.deleteIfExists(tsFile);
     }
@@ -75,7 +76,8 @@ public class IoTDBDataNodeReceiverTest {
     final Path tsFile = 
Files.createTempFile("pipe-load-default-database-level", ".tsfile");
     try {
       final LoadTsFileStatement statement =
-          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null, 
tsFile.toString(), true);
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              null, tsFile.toString(), true, true);
 
       Assert.assertNull(statement.getDatabase());
       Assert.assertEquals(
@@ -92,7 +94,7 @@ public class IoTDBDataNodeReceiverTest {
     try {
       final LoadTsFileStatement statement =
           IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
-              "root.test.sg_0", tsFile.toString(), true);
+              "root.test.sg_0", tsFile.toString(), true, true);
       final long receiverId = System.nanoTime();
       final Exception exception = new RuntimeException("repeated receiver 
exception " + receiverId);
 
@@ -107,4 +109,35 @@ public class IoTDBDataNodeReceiverTest {
       Files.deleteIfExists(tsFile);
     }
   }
+
+  @Test
+  public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType() 
throws Exception {
+    final Path tsFile = 
Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              "root.test.sg_0", tsFile.toString(), false, true);
+
+      Assert.assertTrue(statement.isConvertOnTypeMismatch());
+      Assert.assertTrue(statement.isVerifySchema());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
+
+  @Test
+  public void 
testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType()
+      throws Exception {
+    final Path tsFile = 
Files.createTempFile("pipe-load-no-convert-no-verify-schema", ".tsfile");
+    try {
+      final LoadTsFileStatement statement =
+          IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+              "root.test.sg_0", tsFile.toString(), false, false);
+
+      Assert.assertFalse(statement.isConvertOnTypeMismatch());
+      Assert.assertFalse(statement.isVerifySchema());
+    } finally {
+      Files.deleteIfExists(tsFile);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
index d6bdb1e37fe..01406b0fd06 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
@@ -19,7 +19,10 @@
 
 package org.apache.iotdb.db.queryengine.plan.analyze.load;
 
+import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
@@ -33,6 +36,7 @@ import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator;
+import org.apache.tsfile.read.common.type.TypeFactory;
 import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -45,6 +49,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Collections;
@@ -109,6 +114,41 @@ public class LoadTsFileAnalyzerTest {
     Assert.assertEquals(2, schemaCache.getVerifiedDeviceCount());
   }
 
+  @Test
+  public void testTableSchemaCacheShouldThrowMismatchWhenVerifyingDataType() 
throws Exception {
+    final LoadTsFileTableSchemaCache schemaCache = 
createTableSchemaCache(true);
+    try {
+      final InvocationTargetException exception =
+          Assert.assertThrows(
+              InvocationTargetException.class,
+              () ->
+                  getVerifyTableDataTypeMethod()
+                      .invoke(
+                          schemaCache,
+                          createTableSchema(TSDataType.INT64),
+                          createTableSchema(TSDataType.DOUBLE)));
+
+      Assert.assertTrue(exception.getCause() instanceof 
LoadAnalyzeTypeMismatchException);
+    } finally {
+      schemaCache.close();
+    }
+  }
+
+  @Test
+  public void 
testTableSchemaCacheShouldNotThrowMismatchWhenSkippingDataTypeVerification()
+      throws Exception {
+    final LoadTsFileTableSchemaCache schemaCache = 
createTableSchemaCache(false);
+    try {
+      getVerifyTableDataTypeMethod()
+          .invoke(
+              schemaCache,
+              createTableSchema(TSDataType.INT64),
+              createTableSchema(TSDataType.DOUBLE));
+    } finally {
+      schemaCache.close();
+    }
+  }
+
   private void writeTableTsFileWithMixedDevices(final File tsFile) throws 
Exception {
     if (tsFile.exists()) {
       Assert.assertTrue(tsFile.delete());
@@ -163,6 +203,33 @@ public class LoadTsFileAnalyzerTest {
     tableSchemaCacheField.set(analyzer, schemaCache);
   }
 
+  private LoadTsFileTableSchemaCache createTableSchemaCache(final boolean 
shouldVerifyDataType)
+      throws LoadRuntimeOutOfMemoryException {
+    return new LoadTsFileTableSchemaCache(
+        null, new MPPQueryContext(new QueryId("load_test")), false, 
shouldVerifyDataType);
+  }
+
+  private Method getVerifyTableDataTypeMethod() throws NoSuchMethodException {
+    final Method method =
+        LoadTsFileTableSchemaCache.class.getDeclaredMethod(
+            "verifyTableDataTypeAndGenerateTagColumnMapper",
+            
org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema.class,
+            
org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema.class);
+    method.setAccessible(true);
+    return method;
+  }
+
+  private 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema
+      createTableSchema(final TSDataType fieldType) {
+    return new 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.TableSchema(
+        "table1",
+        Arrays.asList(
+            new ColumnSchema(
+                "tag1", TypeFactory.getType(TSDataType.STRING), false, 
TsTableColumnCategory.TAG),
+            new ColumnSchema(
+                "s1", TypeFactory.getType(fieldType), false, 
TsTableColumnCategory.FIELD)));
+  }
+
   private boolean containsDevice(final Set<IDeviceID> devices, final String... 
expectedSegments) {
     return devices.stream()
         .anyMatch(device -> Arrays.equals(device.getSegments(), 
expectedSegments));
@@ -173,7 +240,7 @@ public class LoadTsFileAnalyzerTest {
     private final Set<List<Object>> verifiedDevices = new HashSet<>();
 
     private TrackingLoadTsFileTableSchemaCache() throws 
LoadRuntimeOutOfMemoryException {
-      super(null, new MPPQueryContext(new QueryId("load_test")), false);
+      super(null, new MPPQueryContext(new QueryId("load_test")), false, true);
     }
 
     @Override

Reply via email to