This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1811e13f640 Pipe: Retry history LoadTsFile while waiting for schema
(#18031)
1811e13f640 is described below
commit 1811e13f6405a1ea2a38ad856bfa32c92ffdd78a
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 14:14:29 2026 +0800
Pipe: Retry history LoadTsFile while waiting for schema (#18031)
* Fix pipe history LoadTsFile schema retry
* Address load tsfile schema retry review
* Fix load tsfile tests with real temp files
---
.../IoTDBPipeReceiverAutoCreateDisabledIT.java | 64 +++++++++++++++++++++-
.../iotdb/db/i18n/DataNodeQueryMessages.java | 6 ++
.../iotdb/db/i18n/DataNodeQueryMessages.java | 6 ++
.../load/LoadAnalyzeMissingSchemaException.java | 27 +++++++++
.../visitor/PipeStatementTSStatusVisitor.java | 4 +-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 57 +++++++++++++++++++
.../load/TreeSchemaAutoCreatorAndVerifier.java | 22 +++++---
.../receiver/PipeStatementTsStatusVisitorTest.java | 32 +++++++++++
.../plan/analyze/load/LoadTsFileAnalyzerTest.java | 33 +++++++++++
9 files changed, 237 insertions(+), 14 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
index 158dd552c20..58aced70ed3 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
@@ -36,6 +36,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -61,14 +62,16 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends
AbstractPipeDualTreeM
.getConfig()
.getCommonConfig()
.setDataReplicationFactor(1)
- .setSchemaReplicationFactor(1);
+ .setSchemaReplicationFactor(1)
+ .setPipeAutoSplitFullEnabled(true);
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
.setDatanodeMemoryProportion("3:3:1:1:1:0")
.setDataReplicationFactor(1)
- .setSchemaReplicationFactor(1);
+ .setSchemaReplicationFactor(1)
+ .setPipeAutoSplitFullEnabled(true);
}
@Test
@@ -122,6 +125,63 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends
AbstractPipeDualTreeM
}
}
+ @Test
+ public void
testAutoSplitHistoryTsFileWithDeletionWhenReceiverAutoCreateSchemaDisabled()
+ throws Exception {
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "create database root.sg",
+ "create timeseries root.sg.non_aligned.s1 with datatype=INT32",
+ "create timeseries root.sg.non_aligned.s2 with datatype=DOUBLE",
+ "create aligned timeseries root.sg.aligned(s1 INT32, s2 DOUBLE)",
+ "create timeseries root.sg.deleted_measurement.s1 with
datatype=INT32",
+ "create timeseries root.sg.deleted_measurement.s2 with
datatype=DOUBLE",
+ "insert into root.sg.non_aligned(time, s1, s2) values(1, 1, 1.0),
(2, 2, 2.0), (3, 3, 3.0)",
+ "insert into root.sg.aligned(time, s1, s2) values(1, 10, 10.0),
(2, 20, 20.0), (3, 30, 30.0)",
+ "insert into root.sg.deleted_measurement(time, s1, s2) values(1,
100, 100.0), (2, 200, 200.0)",
+ "flush",
+ "delete from root.sg.non_aligned.s1 where time > 2",
+ "delete from root.sg.aligned.* where time > 2",
+ "delete timeseries root.sg.deleted_measurement.s1",
+ "flush"));
+
+ awaitUntilFlush(senderEnv);
+
+ TestUtils.executeNonQuery(
+ senderEnv,
+ String.format(
+ "create pipe test with source ('inclusion'='all',
'source.history.enable'='true', 'source.realtime.mode'='batch') "
+ + "with sink ('sink'='iotdb-thrift-sink',
'sink.node-urls'='%s')",
+ receiverEnv.getDataNodeWrapper(0).getIpAndPortString()));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.sg.non_aligned",
+ "Time,root.sg.non_aligned.s1,root.sg.non_aligned.s2,",
+ new HashSet<>(Arrays.asList("1,1,1.0,", "2,2,2.0,", "3,null,3.0,")));
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.sg.aligned",
+ "Time,root.sg.aligned.s1,root.sg.aligned.s2,",
+ new HashSet<>(Arrays.asList("1,10,10.0,", "2,20,20.0,")));
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.sg.deleted_measurement",
+ "Time,root.sg.deleted_measurement.s2,",
+ new HashSet<>(Arrays.asList("1,100.0,", "2,200.0,")));
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "count timeseries root.sg.deleted_measurement.*",
+ "count(timeseries),",
+ new HashSet<>(Arrays.asList("1,")));
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "show devices root.sg.aligned",
+ "Device,IsAligned,Template,TTL(ms),",
+ new HashSet<>(Arrays.asList("root.sg.aligned,true,null,INF,")));
+ }
+
private QueryResult queryForResult(final Statement statement, final String
sql)
throws SQLException {
try (final ResultSet resultSet = statement.executeQuery(sql)) {
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index f870290c09a..96b138a9fe1 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -314,6 +314,12 @@ public final class DataNodeQueryMessages {
"Empty file detected, will skip loading this file: {}";
public static final String AUTO_CREATE_OR_VERIFY_SCHEMA_ERROR =
"Auto create or verify schema error.";
+ public static final String
LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED =
+ "Device %s does not exist in IoTDB and can not be created. Please check
whether auto-create-schema is enabled.";
+ public static final String
LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED =
+ "Measurement %s does not exist in IoTDB and can not be created. Please
check whether auto-create-schema is enabled.";
+ public static final String
PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA =
+ "Pipe generated LoadTsFile is waiting for schema metadata to be
transferred. Detail: %s";
public static final String FAILED_TO_FIND_TAG_COLUMN_MAPPING_FOR_TABLE =
"Failed to find tag column mapping for table {}";
public static final String AUTO_CREATE_DATABASE_FAILED_BECAUSE =
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index b0dae59e198..aa2436c442f 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -312,6 +312,12 @@ public final class DataNodeQueryMessages {
"检测到空文件,将跳过加载此文件:{}";
public static final String AUTO_CREATE_OR_VERIFY_SCHEMA_ERROR =
"自动创建或验证 schema 出错。";
+ public static final String
LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED =
+ "设备 %s 在 IoTDB 中不存在且无法被创建。请检查是否启用了 auto-create-schema。";
+ public static final String
LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED =
+ "时间序列 %s 在 IoTDB 中不存在且无法被创建。请检查是否启用了 auto-create-schema。";
+ public static final String
PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA =
+ "Pipe 生成的 LoadTsFile 正在等待 schema 元数据传输完成。详情:%s";
public static final String FAILED_TO_FIND_TAG_COLUMN_MAPPING_FOR_TABLE =
"未找到表 {} 的标签列映射";
public static final String AUTO_CREATE_DATABASE_FAILED_BECAUSE =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java
new file mode 100644
index 00000000000..9f5e129d56f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadAnalyzeMissingSchemaException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.load;
+
+public class LoadAnalyzeMissingSchemaException extends LoadAnalyzeException {
+
+ public LoadAnalyzeMissingSchemaException(final String message) {
+ super(message);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index ff2805b2e7d..df316b79367 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -74,9 +74,7 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
public TSStatus visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
- || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
- && status.getMessage() != null
- && status.getMessage().contains("memory")) {
+ || status.getCode() ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage(status.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index e357b86e0c7..b1bdd077dc4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.queryengine.common.SqlDialect;
import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException;
import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
@@ -222,6 +223,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
executeTabletConversionOnException(analysis, e);
return analysis;
} catch (Exception e) {
+ if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+ return analysis;
+ }
final String exceptionMessage =
String.format(
"Auto create or verify schema error when executing statement %s.
Detail: %s.",
@@ -346,6 +350,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
"The file %s is not a valid tsfile. Please check the input
file.",
tsFile.getPath()));
} catch (Exception e) {
+ if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+ return false;
+ }
final String exceptionMessage =
String.format(
"Loading file %s failed. Detail: %s",
@@ -681,8 +688,26 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage()));
}
+ private void setFailAnalysisForTemporaryUnavailablePipeSchema(
+ final IAnalysis analysis, final Throwable throwable) {
+ final String exceptionMessage =
+ String.format(
+
DataNodeQueryMessages.PIPE_GENERATED_LOAD_TSFILE_WAITING_FOR_SCHEMA_METADATA,
+ throwable.getMessage() == null
+ ? throwable.getClass().getName()
+ : throwable.getMessage());
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION,
exceptionMessage));
+ setRealStatement(analysis);
+ }
+
private void executeTabletConversionOnException(
final IAnalysis analysis, final LoadAnalyzeException e) {
+ if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+ return;
+ }
+
if (shouldSkipConversion(e)) {
analysis.setFailStatus(
new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
@@ -764,6 +789,38 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
setRealStatement(analysis);
}
+ private boolean setTemporaryUnavailableStatusIfNecessary(
+ final IAnalysis analysis, final Throwable throwable) {
+ if (isTemporaryUnavailableDueToPipeSchemaNotReady(throwable)) {
+ setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, throwable);
+ return true;
+ }
+ if (isGeneratedByPipe &&
LoadTsFileDataTypeConverter.isMemoryPressureException(throwable)) {
+ analysis.setFinishQueryAfterAnalyze(true);
+
analysis.setFailStatus(LoadTsFileDataTypeConverter.getMemoryPressureStatus(throwable));
+ setRealStatement(analysis);
+ return true;
+ }
+ return false;
+ }
+
+ boolean isTemporaryUnavailableDueToPipeSchemaNotReady(final Throwable
throwable) {
+ if (!isGeneratedByPipe
+ || !isVerifySchema
+ ||
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+ return false;
+ }
+
+ Throwable current = throwable;
+ while (current != null) {
+ if (current instanceof LoadAnalyzeMissingSchemaException) {
+ return true;
+ }
+ current = current.getCause();
+ }
+ return false;
+ }
+
private boolean shouldSkipConversion(LoadAnalyzeException e) {
return (e instanceof LoadAnalyzeTypeMismatchException) &&
!isConvertOnTypeMismatch;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
index d4f4210727e..bb93ac5b2d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException;
import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
@@ -109,7 +110,7 @@ public class TreeSchemaAutoCreatorAndVerifier {
public void autoCreateAndVerify(
TsFileSequenceReader reader,
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
- throws IOException, AuthException, LoadAnalyzeTypeMismatchException {
+ throws IOException, AuthException, LoadAnalyzeException {
for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
device2TimeseriesMetadataList.entrySet()) {
final IDeviceID device = entry.getKey();
@@ -198,14 +199,14 @@ public class TreeSchemaAutoCreatorAndVerifier {
schemaCache.clearDeviceIsAlignedCacheIfNecessary();
}
- public void flush() throws AuthException, LoadAnalyzeTypeMismatchException {
+ public void flush() throws AuthException, LoadAnalyzeException {
doAutoCreateAndVerify();
schemaCache.clearTimeSeries();
}
private void doAutoCreateAndVerify()
- throws SemanticException, AuthException,
LoadAnalyzeTypeMismatchException {
+ throws SemanticException, AuthException, LoadAnalyzeException {
if (schemaCache.getDevice2TimeSeries().isEmpty()) {
return;
}
@@ -235,6 +236,11 @@ public class TreeSchemaAutoCreatorAndVerifier {
} else {
handleException(e, loadTsFileAnalyzer.getStatementString());
}
+ } catch (LoadAnalyzeMissingSchemaException e) {
+ if (loadTsFileAnalyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(e))
{
+ throw e;
+ }
+ handleException(e, loadTsFileAnalyzer.getStatementString());
} catch (Exception e) {
if (e.getCause() instanceof LoadAnalyzeTypeMismatchException
&& loadTsFileAnalyzer.isConvertOnTypeMismatch()) {
@@ -449,10 +455,9 @@ public class TreeSchemaAutoCreatorAndVerifier {
.collect(Collectors.toList()));
if (iotdbDeviceSchemaInfo == null) {
- throw new LoadAnalyzeException(
+ throw new LoadAnalyzeMissingSchemaException(
String.format(
- "Device %s does not exist in IoTDB and can not be created. "
- + "Please check weather auto-create-schema is enabled.",
+
DataNodeQueryMessages.LOAD_TSFILE_DEVICE_SCHEMA_MISSING_AUTO_CREATE_DISABLED,
device));
}
@@ -475,10 +480,9 @@ public class TreeSchemaAutoCreatorAndVerifier {
final IMeasurementSchema tsFileSchema = tsfileTimeseriesSchemas.get(i);
final IMeasurementSchema iotdbSchema = iotdbTimeseriesSchemas.get(i);
if (iotdbSchema == null) {
- throw new LoadAnalyzeException(
+ throw new LoadAnalyzeMissingSchemaException(
String.format(
- "Measurement %s does not exist in IoTDB and can not be
created. "
- + "Please check weather auto-create-schema is enabled.",
+
DataNodeQueryMessages.LOAD_TSFILE_MEASUREMENT_SCHEMA_MISSING_AUTO_CREATE_DISABLED,
device + TsFileConstant.PATH_SEPARATOR +
tsfileTimeseriesSchemas.get(i)));
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
index 756d1181825..f2716d5c1a4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -24,12 +24,14 @@ import
org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
import java.util.Arrays;
import java.util.Collections;
@@ -64,6 +66,36 @@ public class PipeStatementTsStatusVisitorTest {
.getCode());
}
+ @Test
+ public void testLoadTemporaryUnavailableClassification() throws Exception {
+ final File tsFile = File.createTempFile("temporary-unavailable",
".tsfile");
+ tsFile.deleteOnExit();
+
+ Assert.assertEquals(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(),
+ IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+ .process(
+ LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+ new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage("schema is not ready"))
+ .getCode());
+ }
+
+ @Test
+ public void testLoadFileErrorWithMemoryMessageIsNotClassifiedByMessage()
throws Exception {
+ final File tsFile = File.createTempFile("memory-error", ".tsfile");
+ tsFile.deleteOnExit();
+
+ Assert.assertEquals(
+ TSStatusCode.LOAD_FILE_ERROR.getStatusCode(),
+ IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+ .process(
+ LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+ new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
+ .setMessage("memory pressure"))
+ .getCode());
+ }
+
@Test
public void testDatabaseNotExistRuntimeExceptionClassification() {
Assert.assertEquals(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
index c533b0043e9..68f4bde29e6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.path.PartialPath;
import
org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.load.LoadAnalyzeMissingSchemaException;
import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -198,6 +200,37 @@ public class LoadTsFileAnalyzerTest {
}
}
+ @Test
+ public void
testPipeGeneratedLoadMissingSchemaShouldBeTemporaryWhenAutoCreateDisabled()
+ throws Exception {
+ final boolean originalAutoCreateSchemaEnabled =
+ IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+ final File tsFile = File.createTempFile("missing-schema", ".tsfile");
+ tsFile.deleteOnExit();
+
+ try (final LoadTsFileAnalyzer analyzer =
+ new LoadTsFileAnalyzer(
+ LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+ true,
+ new MPPQueryContext(new QueryId("load_pipe_test")))) {
+ Assert.assertTrue(
+ analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(
+ new LoadAnalyzeMissingSchemaException("missing device schema")));
+ Assert.assertTrue(
+ analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(
+ new RuntimeException(
+ "wrapped", new LoadAnalyzeMissingSchemaException("missing
measurement schema"))));
+ Assert.assertFalse(
+ analyzer.isTemporaryUnavailableDueToPipeSchemaNotReady(
+ new LoadAnalyzeException("Data type mismatch for measurement
root.sg.d1.s1")));
+ } finally {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setAutoCreateSchemaEnabled(originalAutoCreateSchemaEnabled);
+ }
+ }
+
private void writeTableTsFileWithMixedDevices(final File tsFile) throws
Exception {
if (tsFile.exists()) {
Assert.assertTrue(tsFile.delete());