This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 25d51fed24a Load: Convert to tablets when node is read-only (#15693)
(#15774)
25d51fed24a is described below
commit 25d51fed24a35eae85c6e9bdf08c37622e9a2f81
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Jun 18 18:41:53 2025 +0800
Load: Convert to tablets when node is read-only (#15693) (#15774)
---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 1 +
.../analyze/{ => load}/LoadTsFileAnalyzer.java | 13 +++++------
.../plan/scheduler/load/LoadTsFileScheduler.java | 27 +++++++---------------
3 files changed, 15 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index afc964ea301..b4f163838d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -71,6 +71,7 @@ import
org.apache.iotdb.db.queryengine.execution.operator.window.ainode.Inferenc
import
org.apache.iotdb.db.queryengine.execution.operator.window.ainode.InferenceWindowType;
import
org.apache.iotdb.db.queryengine.execution.operator.window.ainode.TailInferenceWindow;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
+import org.apache.iotdb.db.queryengine.plan.analyze.load.LoadTsFileAnalyzer;
import
org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
similarity index 99%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 19d5cd636d4..2d45a237e73 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.analyze;
+package org.apache.iotdb.db.queryengine.plan.analyze.load;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
@@ -40,7 +40,6 @@ import org.apache.iotdb.db.exception.LoadAnalyzeException;
import org.apache.iotdb.db.exception.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
import org.apache.iotdb.db.exception.load.LoadFileException;
-import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
@@ -51,6 +50,8 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
@@ -157,7 +158,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private final boolean isConvertOnTypeMismatch;
private final long tabletConversionThresholdBytes;
- LoadTsFileAnalyzer(
+ public LoadTsFileAnalyzer(
LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context,
IPartitionFetcher partitionFetcher,
@@ -310,10 +311,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private boolean checkBeforeAnalyzeFileByFile(Analysis analysis) {
// check if the system is read only
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(
- RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY,
LoadReadOnlyException.MESSAGE));
- return false;
+ LOGGER.info(
+ "LoadTsFileAnalyzer: Current datanode is read only, will try to
convert to tablets and insert later.");
}
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 43465d7ed74..fa24b64fc40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -259,7 +259,6 @@ public class LoadTsFileScheduler implements IScheduler {
} catch (Exception e) {
isLoadSuccess = false;
failedTsFileNodeIndexes.add(i);
- stateMachine.transitionToFailed(e);
LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath,
e);
} finally {
if (shouldRemoveFileFromLoadingSet) {
@@ -308,11 +307,9 @@ public class LoadTsFileScheduler implements IScheduler {
node.getTsFileResource().getTsFile(),
tsFileDataManager::addOrSendTsFileData)
.splitTsFileByDataPartition();
if (!tsFileDataManager.sendAllTsFileData()) {
- stateMachine.transitionToFailed(new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
return false;
}
} catch (IllegalStateException e) {
- stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format(
"Dispatch TsFileData error when parsing TsFile %s.",
@@ -320,7 +317,6 @@ public class LoadTsFileScheduler implements IScheduler {
e);
return false;
} catch (Exception e) {
- stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format("Parse or send TsFile %s error.",
node.getTsFileResource().getTsFile()), e);
return false;
@@ -350,7 +346,6 @@ public class LoadTsFileScheduler implements IScheduler {
dispatchResultFuture.get(
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds(),
TimeUnit.SECONDS);
if (!result.isSuccessful()) {
- // TODO: retry.
LOGGER.warn(
"Dispatch one piece to ReplicaSet {} error. Result status code {}.
"
+ "Result status message {}. Dispatch piece node error:%n{}",
@@ -370,7 +365,6 @@ public class LoadTsFileScheduler implements IScheduler {
status.setMessage(
String.format("Load %s piece error in 1st phase. Because ",
pieceNode.getTsFile())
+ status.getMessage());
- stateMachine.transitionToFailed(status); // TODO: record more status
return false;
}
} catch (InterruptedException | ExecutionException | CancellationException
e) {
@@ -378,13 +372,11 @@ public class LoadTsFileScheduler implements IScheduler {
Thread.currentThread().interrupt();
}
LOGGER.warn("Interrupt or Execution error.", e);
- stateMachine.transitionToFailed(e);
return false;
} catch (TimeoutException e) {
dispatchResultFuture.cancel(true);
LOGGER.warn(
String.format("Wait for loading %s time out.",
LoadTsFilePieceNode.class.getName()), e);
- stateMachine.transitionToFailed(e);
return false;
}
return true;
@@ -425,7 +417,6 @@ public class LoadTsFileScheduler implements IScheduler {
FragInstanceDispatchResult result = dispatchResultFuture.get();
if (!result.isSuccessful()) {
- // TODO: retry.
LOGGER.warn(
"Dispatch load command {} of TsFile {} error to replicaSets {}
error. "
+ "Result status code {}. Result status message {}.",
@@ -439,7 +430,6 @@ public class LoadTsFileScheduler implements IScheduler {
String.format(
"Load %s error in second phase. Because %s, first phase is %s",
tsFile, status.getMessage(), isFirstPhaseSuccess ? "success" :
"failed"));
- stateMachine.transitionToFailed(status);
return false;
}
} catch (InterruptedException | ExecutionException e) {
@@ -447,11 +437,9 @@ public class LoadTsFileScheduler implements IScheduler {
Thread.currentThread().interrupt();
}
LOGGER.warn("Interrupt or Execution error.", e);
- stateMachine.transitionToFailed(e);
return false;
} catch (Exception e) {
LOGGER.warn("Exception occurred during second phase of loading TsFile
{}.", tsFile, e);
- stateMachine.transitionToFailed(e);
return false;
}
return true;
@@ -493,7 +481,6 @@ public class LoadTsFileScheduler implements IScheduler {
node.getTsFileResource().getTsFile(),
TSStatusCode.representOf(e.getFailureStatus().getCode()).name(),
e.getFailureStatus().getMessage()));
- stateMachine.transitionToFailed(e.getFailureStatus());
return false;
}
@@ -589,14 +576,16 @@ public class LoadTsFileScheduler implements IScheduler {
// If all failed TsFiles are converted into tablets and inserted,
// we can consider the load process as successful.
if (failedTsFileNodeIndexes.isEmpty()) {
+ LOGGER.info("Load: all failed TsFiles are converted to tablets and
inserted.");
stateMachine.transitionToFinished();
} else {
- stateMachine.transitionToFailed(
- new LoadFileException(
- "Failed to load some TsFiles by converting them into tablets.
Failed TsFiles: "
- + failedTsFileNodeIndexes.stream()
- .map(i ->
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
- .collect(Collectors.joining(", "))));
+ final String errorMsg =
+ "Load: failed to load some TsFiles by converting them into tablets.
Failed TsFiles: "
+ + failedTsFileNodeIndexes.stream()
+ .map(i ->
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
+ .collect(Collectors.joining(", "));
+ LOGGER.warn(errorMsg);
+ stateMachine.transitionToFailed(new LoadFileException(errorMsg));
}
}