This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c10c95f25d8 load-tsfile script: disable redirection & load: handle 
exceptions using the Analysis objects instead of throwing exceptions & pipe: 
handle SYSTEM_READ_ONLY correctly on receiver side (#12716)
c10c95f25d8 is described below

commit c10c95f25d8ba558573e41cfdce55b2521712e4f
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 12 18:43:53 2024 +0800

    load-tsfile script: disable redirection & load: handle exceptions using the 
Analysis objects instead of throwing exceptions & pipe: handle SYSTEM_READ_ONLY 
correctly on receiver side (#12716)
---
 .../java/org/apache/iotdb/tool/ImportTsFile.java   |  4 +-
 .../visitor/PipeStatementExceptionVisitor.java     |  2 +-
 .../visitor/PipeStatementTSStatusVisitor.java      | 21 +++++++++-
 .../queryengine/load/LoadTsFileMemoryManager.java  | 10 ++---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   | 18 +++++---
 .../plan/analyze/LoadTsfileAnalyzer.java           | 48 ++++++++++++----------
 .../plan/planner/LocalExecutionPlanner.java        | 16 +++++---
 7 files changed, 79 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
index 4e74dbe5f59..320b03334df 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
@@ -310,7 +310,9 @@ public class ImportTsFile extends AbstractTsFileTool {
       }
 
       sessionPool =
-          new SessionPool(host, Integer.parseInt(port), username, password, 
threadNum + 1);
+          new SessionPool(
+              host, Integer.parseInt(port), username, password, threadNum + 1, 
false, false);
+      sessionPool.setEnableQueryRedirection(false);
 
       traverseAndCollectFiles(file);
       addNoResourceOrModsToQueue();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 7cbe5bed416..5a65b1a5e05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -61,7 +61,7 @@ public class PipeStatementExceptionVisitor extends 
StatementVisitor<TSStatus, Ex
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     }
-    return super.visitLoadFile(loadTsFileStatement, context);
+    return visitStatement(loadTsFileStatement, context);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index be4ee14ff02..812f8d29151 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -56,6 +57,20 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
     return context;
   }
 
+  @Override
+  public TSStatus visitLoadFile(
+      final LoadTsFileStatement loadTsFileStatement, final TSStatus context) {
+    if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+        || context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
+            && context.getMessage() != null
+            && context.getMessage().contains("memory")) {
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
+    return super.visitLoadFile(loadTsFileStatement, context);
+  }
+
   @Override
   public TSStatus visitInsertTablet(
       final InsertTabletStatement insertTabletStatement, final TSStatus 
context) {
@@ -82,7 +97,11 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
 
   private TSStatus visitInsertBase(
       final InsertBaseStatement insertBaseStatement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+    if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()) {
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
index eff5f790bdb..f034a4627b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
@@ -64,12 +64,12 @@ public class LoadTsFileMemoryManager {
 
     throw new LoadRuntimeOutOfMemoryException(
         String.format(
-            "forceAllocate: failed to allocate memory from query engine after 
%d retries, "
-                + "total query memory %s, available memory for load %s bytes, "
-                + "used memory size %d bytes, requested memory size %d bytes",
+            "forceAllocate: failed to allocate memory from query engine after 
%s retries, "
+                + "total query memory %s bytes, current available memory for 
load %s bytes, "
+                + "current load used memory size %s bytes, load requested 
memory size %s bytes",
             MEMORY_ALLOCATE_MAX_RETRIES,
-            
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(),
-            LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(),
+            QUERY_ENGINE_MEMORY_MANAGER.getAllocateMemoryForOperators(),
+            QUERY_ENGINE_MEMORY_MANAGER.getFreeMemoryForLoadTsFile(),
             usedMemorySizeInBytes.get(),
             sizeInBytes));
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 038514ee02f..84708e52143 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2825,12 +2825,20 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
   @Override
   public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, 
MPPQueryContext context) {
-    LoadTsfileAnalyzer loadTsfileAnalyzer =
-        new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, 
schemaFetcher);
-    try {
+    try (final LoadTsfileAnalyzer loadTsfileAnalyzer =
+        new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, 
schemaFetcher)) {
       return loadTsfileAnalyzer.analyzeFileByFile();
-    } finally {
-      loadTsfileAnalyzer.close();
+    } catch (final Exception e) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to execute load tsfile statement %s. Detail: %s",
+              loadTsFileStatement,
+              e.getMessage() == null ? e.getClass().getName() : 
e.getMessage());
+      logger.warn(exceptionMessage, e);
+      final Analysis analysis = new Analysis();
+      analysis.setFinishQueryAfterAnalyze(true);
+      analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, 
exceptionMessage));
+      return analysis;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 91ca66fc0d5..79e84b3400d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -82,7 +82,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -94,7 +93,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public class LoadTsfileAnalyzer {
+public class LoadTsfileAnalyzer implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsfileAnalyzer.class);
 
@@ -139,9 +138,10 @@ public class LoadTsfileAnalyzer {
   public Analysis analyzeFileByFile() {
     context.setQueryType(QueryType.WRITE);
 
+    Analysis analysis = new Analysis();
+
     // check if the system is read only
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
-      Analysis analysis = new Analysis();
       analysis.setFinishQueryAfterAnalyze(true);
       analysis.setFailStatus(
           RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY, 
LoadReadOnlyException.MESSAGE));
@@ -171,25 +171,17 @@ public class LoadTsfileAnalyzer {
               "Load - Analysis Stage: {}/{} tsfiles have been analyzed, 
progress: {}%",
               i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / 
tsfileNum));
         }
-      } catch (IllegalArgumentException e) {
-        LOGGER.warn(
-            "Parse file {} to resource error, this TsFile maybe empty.", 
tsFile.getPath(), e);
-        throw new SemanticException(
-            String.format("TsFile %s is empty or incomplete.", 
tsFile.getPath()));
       } catch (AuthException e) {
         return createFailAnalysisForAuthException(e);
-      } catch (BufferUnderflowException e) {
-        LOGGER.warn(
-            "The file {} is not a valid tsfile. Please check the input file.", 
tsFile.getPath(), e);
-        throw new SemanticException(
-            String.format(
-                "The file %s is not a valid tsfile. Please check the input 
file.",
-                tsFile.getPath()));
       } catch (Exception e) {
-        LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e);
-        throw new SemanticException(
+        final String exceptionMessage =
             String.format(
-                "Parse file %s to resource error, because %s", 
tsFile.getPath(), e.getMessage()));
+                "The file %s is not a valid tsfile. Please check the input 
file. Detail: %s",
+                tsFile.getPath(), e.getMessage() == null ? 
e.getClass().getName() : e.getMessage());
+        LOGGER.warn(exceptionMessage, e);
+        analysis.setFinishQueryAfterAnalyze(true);
+        
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, 
exceptionMessage));
+        return analysis;
       }
     }
 
@@ -197,16 +189,31 @@ public class LoadTsfileAnalyzer {
       schemaAutoCreatorAndVerifier.flush();
     } catch (AuthException e) {
       return createFailAnalysisForAuthException(e);
+    } catch (Exception e) {
+      final String exceptionMessage =
+          String.format(
+              "Auto create or verify schema error when executing statement %s. 
Detail: %s.",
+              loadTsFileStatement,
+              e.getMessage() == null ? e.getClass().getName() : 
e.getMessage());
+      LOGGER.warn(exceptionMessage, e);
+      analysis.setFinishQueryAfterAnalyze(true);
+      analysis.setFailStatus(
+          RpcUtils.getStatus(
+              TSStatusCode.LOAD_FILE_ERROR,
+              String.format(
+                  "Auto create or verify schema error when executing statement 
%s.",
+                  loadTsFileStatement)));
+      return analysis;
     }
 
     LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
 
     // data partition will be queried in the scheduler
-    final Analysis analysis = new Analysis();
     analysis.setStatement(loadTsFileStatement);
     return analysis;
   }
 
+  @Override
   public void close() {
     schemaAutoCreatorAndVerifier.close();
   }
@@ -360,8 +367,7 @@ public class LoadTsfileAnalyzer {
      * This can only be invoked after all timeseries in the current tsfile 
have been processed.
      * Otherwise, the isAligned status may be wrong.
      */
-    public void flushAndClearDeviceIsAlignedCacheIfNecessary()
-        throws SemanticException, AuthException {
+    public void flushAndClearDeviceIsAlignedCacheIfNecessary() throws 
SemanticException {
       // avoid OOM when loading a tsfile with too many timeseries
       // or loading too many tsfiles at the same time
       schemaCache.clearDeviceIsAlignedCacheIfNecessary();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 4789834f35e..4db63ed1d49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -52,12 +52,12 @@ public class LocalExecutionPlanner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalExecutionPlanner.class);
   private static final long ALLOCATE_MEMORY_FOR_OPERATORS;
-  private static final long MAX_REST_MEMORY_FOR_LOAD;
+  private static final long MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
 
   static {
     IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
     ALLOCATE_MEMORY_FOR_OPERATORS = CONFIG.getAllocateMemoryForOperators();
-    MAX_REST_MEMORY_FOR_LOAD =
+    MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD =
         (long)
             ((ALLOCATE_MEMORY_FOR_OPERATORS) * (1.0 - 
CONFIG.getMaxAllocateMemoryRatioForLoad()));
   }
@@ -69,6 +69,10 @@ public class LocalExecutionPlanner {
     return freeMemoryForOperators;
   }
 
+  public long getFreeMemoryForLoadTsFile() {
+    return freeMemoryForOperators - MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
+  }
+
   public static LocalExecutionPlanner getInstance() {
     return InstanceHolder.INSTANCE;
   }
@@ -211,7 +215,7 @@ public class LocalExecutionPlanner {
   }
 
   public synchronized boolean forceAllocateFreeMemoryForOperators(long 
memoryInBytes) {
-    if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) {
+    if (freeMemoryForOperators - memoryInBytes <= 
MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD) {
       return false;
     } else {
       freeMemoryForOperators -= memoryInBytes;
@@ -220,9 +224,9 @@ public class LocalExecutionPlanner {
   }
 
   public synchronized long tryAllocateFreeMemoryForOperators(long 
memoryInBytes) {
-    if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) {
-      long result = freeMemoryForOperators - MAX_REST_MEMORY_FOR_LOAD;
-      freeMemoryForOperators = MAX_REST_MEMORY_FOR_LOAD;
+    if (freeMemoryForOperators - memoryInBytes <= 
MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD) {
+      long result = freeMemoryForOperators - 
MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
+      freeMemoryForOperators = MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
       return result;
     } else {
       freeMemoryForOperators -= memoryInBytes;

Reply via email to