This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 9ad7c19788c Pipe: Retry history LoadTsFile while waiting for schema
(#18031) (#18042)
9ad7c19788c is described below
commit 9ad7c19788c828533d1ef1a5f0446f81725dfd28
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:19:25 2026 +0800
Pipe: Retry history LoadTsFile while waiting for schema (#18031) (#18042)
* Fix pipe history LoadTsFile schema retry
* Address load tsfile schema retry review
* Fix load tsfile tests with real temp files
(cherry picked from commit 1811e13f6405a1ea2a38ad856bfa32c92ffdd78a)
---
.../LoadAnalyzeMissingSchemaException.java | 27 ++++++++
.../visitor/PipeStatementTSStatusVisitor.java | 4 +-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 80 ++++++++++++++++++++--
.../receiver/PipeStatementTsStatusVisitorTest.java | 32 +++++++++
4 files changed, 133 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java
new file mode 100644
index 00000000000..42da0e818a9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+public class LoadAnalyzeMissingSchemaException extends LoadAnalyzeException {
+
+ public LoadAnalyzeMissingSchemaException(final String message) {
+ super(message);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index d370bff2798..509be0301cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -74,9 +74,7 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
public TSStatus visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
- || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
- && status.getMessage() != null
- && status.getMessage().contains("memory")) {
+ || status.getCode() ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage(status.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 5902c342ec5..d47953abe9b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.LoadAnalyzeMissingSchemaException;
import org.apache.iotdb.db.exception.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
import org.apache.iotdb.db.exception.load.LoadFileException;
@@ -214,6 +215,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
// the real result on the conversion will be set in the analysis.
return analysis;
} catch (Exception e) {
+ if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+ return analysis;
+ }
final String exceptionMessage =
String.format(
"Auto create or verify schema error when executing statement %s.
Detail: %s.",
@@ -315,6 +319,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
"The file %s is not a valid tsfile. Please check the input
file.",
tsFile.getPath()));
} catch (Exception e) {
+ if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+ return false;
+ }
final String exceptionMessage =
String.format(
"Loading file %s failed. Detail: %s",
@@ -484,6 +491,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private Analysis executeTabletConversionOnException(
final Analysis analysis, final LoadAnalyzeException e) {
+ if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+ return analysis;
+ }
+
if (shouldSkipConversion(e)) {
analysis.setFailStatus(
new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
@@ -517,6 +528,52 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
return analysis;
}
+ private boolean setTemporaryUnavailableStatusIfNecessary(
+ final Analysis analysis, final Throwable throwable) {
+ if (isTemporaryUnavailableDueToPipeSchemaNotReady(throwable)) {
+ setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, throwable);
+ return true;
+ }
+ if (isGeneratedByPipe &&
LoadTsFileDataTypeConverter.isMemoryPressureException(throwable)) {
+ analysis.setFinishQueryAfterAnalyze(true);
+
analysis.setFailStatus(LoadTsFileDataTypeConverter.getMemoryPressureStatus(throwable));
+ analysis.setStatement(loadTsFileStatement);
+ return true;
+ }
+ return false;
+ }
+
+ private void setFailAnalysisForTemporaryUnavailablePipeSchema(
+ final Analysis analysis, final Throwable throwable) {
+ final String exceptionMessage =
+ String.format(
+ "Pipe generated LoadTsFile is waiting for schema metadata to be
transferred. Detail: %s",
+ throwable.getMessage() == null
+ ? throwable.getClass().getName()
+ : throwable.getMessage());
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION,
exceptionMessage));
+ analysis.setStatement(loadTsFileStatement);
+ }
+
+ boolean isTemporaryUnavailableDueToPipeSchemaNotReady(final Throwable
throwable) {
+ if (!isGeneratedByPipe
+ || !isVerifySchema
+ ||
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+ return false;
+ }
+
+ Throwable current = throwable;
+ while (current != null) {
+ if (current instanceof LoadAnalyzeMissingSchemaException) {
+ return true;
+ }
+ current = current.getCause();
+ }
+ return false;
+ }
+
private boolean shouldSkipConversion(LoadAnalyzeException e) {
return (e instanceof LoadAnalyzeTypeMismatchException)
&& !loadTsFileStatement.isConvertOnTypeMismatch();
@@ -545,7 +602,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
public void autoCreateAndVerify(
TsFileSequenceReader reader,
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeSeriesMetadataList)
- throws IOException, AuthException, LoadAnalyzeTypeMismatchException {
+ throws IOException, AuthException, LoadAnalyzeException {
for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
device2TimeSeriesMetadataList.entrySet()) {
final IDeviceID device = entry.getKey();
@@ -647,14 +704,14 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
schemaCache.clearDeviceIsAlignedCacheIfNecessary();
}
- public void flush() throws AuthException, LoadAnalyzeTypeMismatchException
{
+ public void flush() throws AuthException, LoadAnalyzeException {
doAutoCreateAndVerify();
schemaCache.clearTimeSeries();
}
private void doAutoCreateAndVerify()
- throws SemanticException, AuthException,
LoadAnalyzeTypeMismatchException {
+ throws SemanticException, AuthException, LoadAnalyzeException {
if (schemaCache.getDevice2TimeSeries().isEmpty()) {
return;
}
@@ -677,6 +734,15 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
} catch (AuthException | LoadAnalyzeTypeMismatchException e) {
throw e;
+ } catch (LoadAnalyzeMissingSchemaException e) {
+ if (isTemporaryUnavailableDueToPipeSchemaNotReady(e)) {
+ throw e;
+ }
+ LOGGER.warn("Auto create or verify schema error.", e);
+ throw new SemanticException(
+ String.format(
+ "Auto create or verify schema error when executing statement
%s. Detail: %s.",
+ loadTsFileStatement, e.getMessage()));
} catch (Exception e) {
if (e.getCause() instanceof LoadAnalyzeTypeMismatchException &&
isConvertOnTypeMismatch) {
throw (LoadAnalyzeTypeMismatchException) e.getCause();
@@ -863,10 +929,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
.collect(Collectors.toList()));
if (iotdbDeviceSchemaInfo == null) {
- throw new LoadAnalyzeException(
+ throw new LoadAnalyzeMissingSchemaException(
String.format(
"Device %s does not exist in IoTDB and can not be created. "
- + "Please check weather auto-create-schema is enabled.",
+ + "Please check whether auto-create-schema is enabled.",
device));
}
@@ -889,10 +955,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
final MeasurementSchema tsFileSchema =
tsfileTimeseriesSchemas.get(i);
final MeasurementSchema iotdbSchema = iotdbTimeseriesSchemas.get(i);
if (iotdbSchema == null) {
- throw new LoadAnalyzeException(
+ throw new LoadAnalyzeMissingSchemaException(
String.format(
"Measurement %s does not exist in IoTDB and can not be
created. "
- + "Please check weather auto-create-schema is
enabled.",
+ + "Please check whether auto-create-schema is
enabled.",
device + TsFileConstant.PATH_SEPARATOR +
tsfileTimeseriesSchemas.get(i)));
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
index 756d1181825..f2716d5c1a4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -24,12 +24,14 @@ import
org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
import java.util.Arrays;
import java.util.Collections;
@@ -64,6 +66,36 @@ public class PipeStatementTsStatusVisitorTest {
.getCode());
}
+ @Test
+ public void testLoadTemporaryUnavailableClassification() throws Exception {
+ final File tsFile = File.createTempFile("temporary-unavailable",
".tsfile");
+ tsFile.deleteOnExit();
+
+ Assert.assertEquals(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(),
+ IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+ .process(
+ LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+ new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage("schema is not ready"))
+ .getCode());
+ }
+
+ @Test
+ public void testLoadFileErrorWithMemoryMessageIsNotClassifiedByMessage()
throws Exception {
+ final File tsFile = File.createTempFile("memory-error", ".tsfile");
+ tsFile.deleteOnExit();
+
+ Assert.assertEquals(
+ TSStatusCode.LOAD_FILE_ERROR.getStatusCode(),
+ IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+ .process(
+ LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+ new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
+ .setMessage("memory pressure"))
+ .getCode());
+ }
+
@Test
public void testDatabaseNotExistRuntimeExceptionClassification() {
Assert.assertEquals(