This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1811e13f640 Pipe: Retry history LoadTsFile while waiting for schema 
(#18031)
1811e13f640 is described below

commit 1811e13f6405a1ea2a38ad856bfa32c92ffdd78a
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 14:14:29 2026 +0800

    Pipe: Retry history LoadTsFile while waiting for schema (#18031)
    
    * Fix pipe history LoadTsFile schema retry
    
    * Address load tsfile schema retry review
    
    * Fix load tsfile tests with real temp files
---
 .../IoTDBPipeReceiverAutoCreateDisabledIT.java     | 64 +++++++++++++++++++++-
 .../iotdb/db/i18n/DataNodeQueryMessages.java       |  6 ++
 .../iotdb/db/i18n/DataNodeQueryMessages.java       |  6 ++
 .../load/LoadAnalyzeMissingSchemaException.java    | 27 +++++++++
 .../visitor/PipeStatementTSStatusVisitor.java      |  4 +-
 .../plan/analyze/load/LoadTsFileAnalyzer.java      | 57 +++++++++++++++++++
 .../load/TreeSchemaAutoCreatorAndVerifier.java     | 22 +++++---
 .../receiver/PipeStatementTsStatusVisitorTest.java | 32 +++++++++++
 .../plan/analyze/load/LoadTsFileAnalyzerTest.java  | 33 +++++++++++
 9 files changed, 237 insertions(+), 14 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
index 158dd552c20..58aced70ed3 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
@@ -36,6 +36,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -61,14 +62,16 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends 
AbstractPipeDualTreeM
         .getConfig()
         .getCommonConfig()
         .setDataReplicationFactor(1)
-        .setSchemaReplicationFactor(1);
+        .setSchemaReplicationFactor(1)
+        .setPipeAutoSplitFullEnabled(true);
     receiverEnv
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(false)
         .setDatanodeMemoryProportion("3:3:1:1:1:0")
         .setDataReplicationFactor(1)
-        .setSchemaReplicationFactor(1);
+        .setSchemaReplicationFactor(1)
+        .setPipeAutoSplitFullEnabled(true);
   }
 
   @Test
@@ -122,6 +125,63 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends 
AbstractPipeDualTreeM
     }
   }
 
+  @Test
+  public void 
testAutoSplitHistoryTsFileWithDeletionWhenReceiverAutoCreateSchemaDisabled()
+      throws Exception {
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "create database root.sg",
+            "create timeseries root.sg.non_aligned.s1 with datatype=INT32",
+            "create timeseries root.sg.non_aligned.s2 with datatype=DOUBLE",
+            "create aligned timeseries root.sg.aligned(s1 INT32, s2 DOUBLE)",
+            "create timeseries root.sg.deleted_measurement.s1 with 
datatype=INT32",
+            "create timeseries root.sg.deleted_measurement.s2 with 
datatype=DOUBLE",
+            "insert into root.sg.non_aligned(time, s1, s2) values(1, 1, 1.0), 
(2, 2, 2.0), (3, 3, 3.0)",
+            "insert into root.sg.aligned(time, s1, s2) values(1, 10, 10.0), 
(2, 20, 20.0), (3, 30, 30.0)",
+            "insert into root.sg.deleted_measurement(time, s1, s2) values(1, 
100, 100.0), (2, 200, 200.0)",
+            "flush",
+            "delete from root.sg.non_aligned.s1 where time > 2",
+            "delete from root.sg.aligned.* where time > 2",
+            "delete timeseries root.sg.deleted_measurement.s1",
+            "flush"));
+
+    awaitUntilFlush(senderEnv);
+
+    TestUtils.executeNonQuery(
+        senderEnv,
+        String.format(
+            "create pipe test with source ('inclusion'='all', 
'source.history.enable'='true', 'source.realtime.mode'='batch') "
+                + "with sink ('sink'='iotdb-thrift-sink', 
'sink.node-urls'='%s')",
+            receiverEnv.getDataNodeWrapper(0).getIpAndPortString()));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select * from root.sg.non_aligned",
+        "Time,root.sg.non_aligned.s1,root.sg.non_aligned.s2,",
+        new HashSet<>(Arrays.asList("1,1,1.0,", "2,2,2.0,", "3,null,3.0,")));
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select * from root.sg.aligned",
+        "Time,root.sg.aligned.s1,root.sg.aligned.s2,",
+        new HashSet<>(Arrays.asList("1,10,10.0,", "2,20,20.0,")));
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select * from root.sg.deleted_measurement",
+        "Time,root.sg.deleted_measurement.s2,",
+        new HashSet<>(Arrays.asList("1,100.0,", "2,200.0,")));
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "count timeseries root.sg.deleted_measurement.*",
+        "count(timeseries),",
+        new HashSet<>(Arrays.asList("1,")));
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "show devices root.sg.aligned",
+        "Device,IsAligned,Template,TTL(ms),",
+        new HashSet<>(Arrays.asList("root.sg.aligned,true,null,INF,")));
+  }
+
   private QueryResult queryForResult(final Statement statement, final String 
sql)
       throws SQLException {
     try (final ResultSet resultSet = statement.executeQuery(sql)) {
diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index f870290c09a..96b138a9fe1 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -314,6 +314,12 @@ public final class DataNodeQueryMessages {
       "Empty file detected, will skip loading this file: {}";
   public static final String AUTO_CREATE_OR_VERIFY_SCHEMA_ERROR =
       "Auto create or verify schema error.";
+  public static final String 
LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED =
+      "Device %s does not exist in IoTDB and can not be created. Please check 
whether auto-create-schema is enabled.";
+  public static final String 
LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED =
+      "Measurement %s does not exist in IoTDB and can not be created. Please 
check whether auto-create-schema is enabled.";
+  public static final String 
PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA =
+      "Pipe generated LoadTsFile is waiting for schema metadata to be 
transferred. Detail: %s";
   public static final String FAILED_TO_FIND_TAG_COLUMN_MAPPING_FOR_TABLE =
       "Failed to find tag column mapping for table {}";
   public static final String AUTO_CREATE_DATABASE_FAILED_BECAUSE =
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index b0dae59e198..aa2436c442f 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -312,6 +312,12 @@ public final class DataNodeQueryMessages {
       "检测到空文件,将跳过加载此文件:{}";
   public static final String AUTO_CREATE_OR_VERIFY_SCHEMA_ERROR =
       "自动创建或验证 schema 出错。";
+  public static final String 
LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED =
+      "设备 %s 在 IoTDB 中不存在且无法被创建。请检查是否启用了 auto-create-schema。";
+  public static final String 
LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED =
+      "时间序列 %s 在 IoTDB 中不存在且无法被创建。请检查是否启用了 auto-create-schema。";
+  public static final String 
PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA =
+      "Pipe 生成的 LoadTsFile 正在等待 schema 元数据传输完成。详情:%s";
   public static final String FAILED_TO_FIND_TAG_COLUMN_MAPPING_FOR_TABLE =
       "未找到表 {} 的标签列映射";
   public static final String AUTO_CREATE_DATABASE_FAILED_BECAUSE =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java
new file mode 100644
index 00000000000..9f5e129d56f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.load;
+
+public class LoadAnalyzeMissingSchemaException extends LoadAnalyzeException {
+
+  public LoadAnalyzeMissingSchemaException(final String message) {
+    super(message);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index ff2805b2e7d..df316b79367 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -74,9 +74,7 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   public TSStatus visitLoadFile(
       final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
     if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
-        || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
-            && status.getMessage() != null
-            && status.getMessage().contains("memory")) {
+        || status.getCode() == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
           .setMessage(status.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index e357b86e0c7..b1bdd077dc4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.queryengine.common.SqlDialect;
 import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException;
 import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
 import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
@@ -222,6 +223,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       executeTabletConversionOnException(analysis, e);
       return analysis;
     } catch (Exception e) {
+      if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+        return analysis;
+      }
       final String exceptionMessage =
           String.format(
               "Auto create or verify schema error when executing statement %s. 
Detail: %s.",
@@ -346,6 +350,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
                 "The file %s is not a valid tsfile. Please check the input 
file.",
                 tsFile.getPath()));
       } catch (Exception e) {
+        if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+          return false;
+        }
         final String exceptionMessage =
             String.format(
                 "Loading file %s failed. Detail: %s",
@@ -681,8 +688,26 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage()));
   }
 
+  private void setFailAnalysisForTemporaryUnavailablePipeSchema(
+      final IAnalysis analysis, final Throwable throwable) {
+    final String exceptionMessage =
+        String.format(
+            
DataNodeQueryMessages.PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA,
+            throwable.getMessage() == null
+                ? throwable.getClass().getName()
+                : throwable.getMessage());
+    analysis.setFinishQueryAfterAnalyze(true);
+    analysis.setFailStatus(
+        RpcUtils.getStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION, 
exceptionMessage));
+    setRealStatement(analysis);
+  }
+
   private void executeTabletConversionOnException(
       final IAnalysis analysis, final LoadAnalyzeException e) {
+    if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+      return;
+    }
+
     if (shouldSkipConversion(e)) {
       analysis.setFailStatus(
           new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
@@ -764,6 +789,38 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     setRealStatement(analysis);
   }
 
+  private boolean setTemporaryUnavailableStatusIfNecessary(
+      final IAnalysis analysis, final Throwable throwable) {
+    if (isTemporaryUnavailableDueToPipeSchemaNotReady(throwable)) {
+      setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, throwable);
+      return true;
+    }
+    if (isGeneratedByPipe && 
LoadTsFileDataTypeConverter.isMemoryPressureException(throwable)) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      
analysis.setFailStatus(LoadTsFileDataTypeConverter.getMemoryPressureStatus(throwable));
+      setRealStatement(analysis);
+      return true;
+    }
+    return false;
+  }
+
+  boolean isTemporaryUnavailableDueToPipeSchemaNotReady(final Throwable 
throwable) {
+    if (!isGeneratedByPipe
+        || !isVerifySchema
+        || 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+      return false;
+    }
+
+    Throwable current = throwable;
+    while (current != null) {
+      if (current instanceof LoadAnalyzeMissingSchemaException) {
+        return true;
+      }
+      current = current.getCause();
+    }
+    return false;
+  }
+
   private boolean shouldSkipConversion(LoadAnalyzeException e) {
     return (e instanceof LoadAnalyzeTypeMismatchException) && 
!isConvertOnTypeMismatch;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
index d4f4210727e..bb93ac5b2d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException;
 import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
@@ -109,7 +110,7 @@ public class TreeSchemaAutoCreatorAndVerifier {
   public void autoCreateAndVerify(
       TsFileSequenceReader reader,
       Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
-      throws IOException, AuthException, LoadAnalyzeTypeMismatchException {
+      throws IOException, AuthException, LoadAnalyzeException {
     for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
         device2TimeseriesMetadataList.entrySet()) {
       final IDeviceID device = entry.getKey();
@@ -198,14 +199,14 @@ public class TreeSchemaAutoCreatorAndVerifier {
     schemaCache.clearDeviceIsAlignedCacheIfNecessary();
   }
 
-  public void flush() throws AuthException, LoadAnalyzeTypeMismatchException {
+  public void flush() throws AuthException, LoadAnalyzeException {
     doAutoCreateAndVerify();
 
     schemaCache.clearTimeSeries();
   }
 
   private void doAutoCreateAndVerify()
-      throws SemanticException, AuthException, 
LoadAnalyzeTypeMismatchException {
+      throws SemanticException, AuthException, LoadAnalyzeException {
     if (schemaCache.getDevice2TimeSeries().isEmpty()) {
       return;
     }
@@ -235,6 +236,11 @@ public class TreeSchemaAutoCreatorAndVerifier {
       } else {
         handleException(e, loadTsFileAnalyzer.getStatementString());
       }
+    } catch (LoadAnalyzeMissingSchemaException e) {
+      if (loadTsFileAnalyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(e)) 
{
+        throw e;
+      }
+      handleException(e, loadTsFileAnalyzer.getStatementString());
     } catch (Exception e) {
       if (e.getCause() instanceof LoadAnalyzeTypeMismatchException
           && loadTsFileAnalyzer.isConvertOnTypeMismatch()) {
@@ -449,10 +455,9 @@ public class TreeSchemaAutoCreatorAndVerifier {
                   .collect(Collectors.toList()));
 
       if (iotdbDeviceSchemaInfo == null) {
-        throw new LoadAnalyzeException(
+        throw new LoadAnalyzeMissingSchemaException(
             String.format(
-                "Device %s does not exist in IoTDB and can not be created. "
-                    + "Please check weather auto-create-schema is enabled.",
+                
DataNodeQueryMessages.LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED,
                 device));
       }
 
@@ -475,10 +480,9 @@ public class TreeSchemaAutoCreatorAndVerifier {
         final IMeasurementSchema tsFileSchema = tsfileTimeseriesSchemas.get(i);
         final IMeasurementSchema iotdbSchema = iotdbTimeseriesSchemas.get(i);
         if (iotdbSchema == null) {
-          throw new LoadAnalyzeException(
+          throw new LoadAnalyzeMissingSchemaException(
               String.format(
-                  "Measurement %s does not exist in IoTDB and can not be 
created. "
-                      + "Please check weather auto-create-schema is enabled.",
+                  
DataNodeQueryMessages.LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED,
                   device + TsFileConstant.PATH_SEPARATOR + 
tsfileTimeseriesSchemas.get(i)));
         }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
index 756d1181825..f2716d5c1a4 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -24,12 +24,14 @@ import 
org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -64,6 +66,36 @@ public class PipeStatementTsStatusVisitorTest {
             .getCode());
   }
 
+  @Test
+  public void testLoadTemporaryUnavailableClassification() throws Exception {
+    final File tsFile = File.createTempFile("temporary-unavailable", 
".tsfile");
+    tsFile.deleteOnExit();
+
+    Assert.assertEquals(
+        
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+            .process(
+                LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+                new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+                    .setMessage("schema is not ready"))
+            .getCode());
+  }
+
+  @Test
+  public void testLoadFileErrorWithMemoryMessageIsNotClassifiedByMessage() 
throws Exception {
+    final File tsFile = File.createTempFile("memory-error", ".tsfile");
+    tsFile.deleteOnExit();
+
+    Assert.assertEquals(
+        TSStatusCode.LOAD_FILE_ERROR.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+            .process(
+                LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+                new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
+                    .setMessage("memory pressure"))
+            .getCode());
+  }
+
   @Test
   public void testDatabaseNotExistRuntimeExceptionClassification() {
     Assert.assertEquals(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
index c533b0043e9..68f4bde29e6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.path.PartialPath;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException;
 import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -198,6 +200,37 @@ public class LoadTsFileAnalyzerTest {
     }
   }
 
+  @Test
+  public void 
testPipeGeneratedLoadMissingSchemaShouldBeTemporaryWhenAutoCreateDisabled()
+      throws Exception {
+    final boolean originalAutoCreateSchemaEnabled =
+        IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+    
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+    final File tsFile = File.createTempFile("missing-schema", ".tsfile");
+    tsFile.deleteOnExit();
+
+    try (final LoadTsFileAnalyzer analyzer =
+        new LoadTsFileAnalyzer(
+            LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+            true,
+            new MPPQueryContext(new QueryId("load_pipe_test")))) {
+      Assert.assertTrue(
+          analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(
+              new LoadAnalyzeMissingSchemaException("missing device schema")));
+      Assert.assertTrue(
+          analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(
+              new RuntimeException(
+                  "wrapped", new LoadAnalyzeMissingSchemaException("missing 
measurement schema"))));
+      Assert.assertFalse(
+          analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(
+              new LoadAnalyzeException("Data type mismatch for measurement 
root.sg.d1.s1")));
+    } finally {
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setAutoCreateSchemaEnabled(originalAutoCreateSchemaEnabled);
+    }
+  }
+
   private void writeTableTsFileWithMixedDevices(final File tsFile) throws 
Exception {
     if (tsFile.exists()) {
       Assert.assertTrue(tsFile.delete());

Reply via email to