This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 25d51fed24a Load: Convert to tablets when node is read-only (#15693) 
(#15774)
25d51fed24a is described below

commit 25d51fed24a35eae85c6e9bdf08c37622e9a2f81
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Jun 18 18:41:53 2025 +0800

    Load: Convert to tablets when node is read-only (#15693) (#15774)
---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  1 +
 .../analyze/{ => load}/LoadTsFileAnalyzer.java     | 13 +++++------
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 27 +++++++---------------
 3 files changed, 15 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index afc964ea301..b4f163838d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -71,6 +71,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.window.ainode.Inferenc
 import 
org.apache.iotdb.db.queryengine.execution.operator.window.ainode.InferenceWindowType;
 import 
org.apache.iotdb.db.queryengine.execution.operator.window.ainode.TailInferenceWindow;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
+import org.apache.iotdb.db.queryengine.plan.analyze.load.LoadTsFileAnalyzer;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
 import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 19d5cd636d4..2d45a237e73 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.analyze;
+package org.apache.iotdb.db.queryengine.plan.analyze.load;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
@@ -40,7 +40,6 @@ import org.apache.iotdb.db.exception.LoadAnalyzeException;
 import org.apache.iotdb.db.exception.LoadAnalyzeTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
 import org.apache.iotdb.db.exception.load.LoadFileException;
-import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
@@ -51,6 +50,8 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
@@ -157,7 +158,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
   private final boolean isConvertOnTypeMismatch;
   private final long tabletConversionThresholdBytes;
 
-  LoadTsFileAnalyzer(
+  public LoadTsFileAnalyzer(
       LoadTsFileStatement loadTsFileStatement,
       MPPQueryContext context,
       IPartitionFetcher partitionFetcher,
@@ -310,10 +311,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
   private boolean checkBeforeAnalyzeFileByFile(Analysis analysis) {
     // check if the system is read only
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
-      analysis.setFinishQueryAfterAnalyze(true);
-      analysis.setFailStatus(
-          RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY, 
LoadReadOnlyException.MESSAGE));
-      return false;
+      LOGGER.info(
+          "LoadTsFileAnalyzer: Current datanode is read only, will try to 
convert to tablets and insert later.");
     }
 
     return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 43465d7ed74..fa24b64fc40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -259,7 +259,6 @@ public class LoadTsFileScheduler implements IScheduler {
         } catch (Exception e) {
           isLoadSuccess = false;
           failedTsFileNodeIndexes.add(i);
-          stateMachine.transitionToFailed(e);
           LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath, 
e);
         } finally {
           if (shouldRemoveFileFromLoadingSet) {
@@ -308,11 +307,9 @@ public class LoadTsFileScheduler implements IScheduler {
               node.getTsFileResource().getTsFile(), 
tsFileDataManager::addOrSendTsFileData)
           .splitTsFileByDataPartition();
       if (!tsFileDataManager.sendAllTsFileData()) {
-        stateMachine.transitionToFailed(new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
         return false;
       }
     } catch (IllegalStateException e) {
-      stateMachine.transitionToFailed(e);
       LOGGER.warn(
           String.format(
               "Dispatch TsFileData error when parsing TsFile %s.",
@@ -320,7 +317,6 @@ public class LoadTsFileScheduler implements IScheduler {
           e);
       return false;
     } catch (Exception e) {
-      stateMachine.transitionToFailed(e);
       LOGGER.warn(
           String.format("Parse or send TsFile %s error.", 
node.getTsFileResource().getTsFile()), e);
       return false;
@@ -350,7 +346,6 @@ public class LoadTsFileScheduler implements IScheduler {
           dispatchResultFuture.get(
               CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds(), 
TimeUnit.SECONDS);
       if (!result.isSuccessful()) {
-        // TODO: retry.
         LOGGER.warn(
             "Dispatch one piece to ReplicaSet {} error. Result status code {}. 
"
                 + "Result status message {}. Dispatch piece node error:%n{}",
@@ -370,7 +365,6 @@ public class LoadTsFileScheduler implements IScheduler {
         status.setMessage(
             String.format("Load %s piece error in 1st phase. Because ", 
pieceNode.getTsFile())
                 + status.getMessage());
-        stateMachine.transitionToFailed(status); // TODO: record more status
         return false;
       }
     } catch (InterruptedException | ExecutionException | CancellationException 
e) {
@@ -378,13 +372,11 @@ public class LoadTsFileScheduler implements IScheduler {
         Thread.currentThread().interrupt();
       }
       LOGGER.warn("Interrupt or Execution error.", e);
-      stateMachine.transitionToFailed(e);
       return false;
     } catch (TimeoutException e) {
       dispatchResultFuture.cancel(true);
       LOGGER.warn(
           String.format("Wait for loading %s time out.", 
LoadTsFilePieceNode.class.getName()), e);
-      stateMachine.transitionToFailed(e);
       return false;
     }
     return true;
@@ -425,7 +417,6 @@ public class LoadTsFileScheduler implements IScheduler {
 
       FragInstanceDispatchResult result = dispatchResultFuture.get();
       if (!result.isSuccessful()) {
-        // TODO: retry.
         LOGGER.warn(
             "Dispatch load command {} of TsFile {} error to replicaSets {} 
error. "
                 + "Result status code {}. Result status message {}.",
@@ -439,7 +430,6 @@ public class LoadTsFileScheduler implements IScheduler {
             String.format(
                 "Load %s error in second phase. Because %s, first phase is %s",
                 tsFile, status.getMessage(), isFirstPhaseSuccess ? "success" : 
"failed"));
-        stateMachine.transitionToFailed(status);
         return false;
       }
     } catch (InterruptedException | ExecutionException e) {
@@ -447,11 +437,9 @@ public class LoadTsFileScheduler implements IScheduler {
         Thread.currentThread().interrupt();
       }
       LOGGER.warn("Interrupt or Execution error.", e);
-      stateMachine.transitionToFailed(e);
       return false;
     } catch (Exception e) {
       LOGGER.warn("Exception occurred during second phase of loading TsFile 
{}.", tsFile, e);
-      stateMachine.transitionToFailed(e);
       return false;
     }
     return true;
@@ -493,7 +481,6 @@ public class LoadTsFileScheduler implements IScheduler {
               node.getTsFileResource().getTsFile(),
               TSStatusCode.representOf(e.getFailureStatus().getCode()).name(),
               e.getFailureStatus().getMessage()));
-      stateMachine.transitionToFailed(e.getFailureStatus());
       return false;
     }
 
@@ -589,14 +576,16 @@ public class LoadTsFileScheduler implements IScheduler {
     // If all failed TsFiles are converted into tablets and inserted,
     // we can consider the load process as successful.
     if (failedTsFileNodeIndexes.isEmpty()) {
+      LOGGER.info("Load: all failed TsFiles are converted to tablets and 
inserted.");
       stateMachine.transitionToFinished();
     } else {
-      stateMachine.transitionToFailed(
-          new LoadFileException(
-              "Failed to load some TsFiles by converting them into tablets. 
Failed TsFiles: "
-                  + failedTsFileNodeIndexes.stream()
-                      .map(i -> 
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
-                      .collect(Collectors.joining(", "))));
+      final String errorMsg =
+          "Load: failed to load some TsFiles by converting them into tablets. 
Failed TsFiles: "
+              + failedTsFileNodeIndexes.stream()
+                  .map(i -> 
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
+                  .collect(Collectors.joining(", "));
+      LOGGER.warn(errorMsg);
+      stateMachine.transitionToFailed(new LoadFileException(errorMsg));
     }
   }
 

Reply via email to