This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c10c95f25d8 load-tsfile script: disable redirection & load: handle
exceptions using the Analysis objects instead of throwing exceptions & pipe:
handle SYSTEM_READ_ONLY correctly on receiver side (#12716)
c10c95f25d8 is described below
commit c10c95f25d8ba558573e41cfdce55b2521712e4f
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 12 18:43:53 2024 +0800
load-tsfile script: disable redirection & load: handle exceptions using the
Analysis objects instead of throwing exceptions & pipe: handle SYSTEM_READ_ONLY
correctly on receiver side (#12716)
---
.../java/org/apache/iotdb/tool/ImportTsFile.java | 4 +-
.../visitor/PipeStatementExceptionVisitor.java | 2 +-
.../visitor/PipeStatementTSStatusVisitor.java | 21 +++++++++-
.../queryengine/load/LoadTsFileMemoryManager.java | 10 ++---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 18 +++++---
.../plan/analyze/LoadTsfileAnalyzer.java | 48 ++++++++++++----------
.../plan/planner/LocalExecutionPlanner.java | 16 +++++---
7 files changed, 79 insertions(+), 40 deletions(-)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
index 4e74dbe5f59..320b03334df 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
@@ -310,7 +310,9 @@ public class ImportTsFile extends AbstractTsFileTool {
}
sessionPool =
- new SessionPool(host, Integer.parseInt(port), username, password,
threadNum + 1);
+ new SessionPool(
+ host, Integer.parseInt(port), username, password, threadNum + 1,
false, false);
+ sessionPool.setEnableQueryRedirection(false);
traverseAndCollectFiles(file);
addNoResourceOrModsToQueue();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 7cbe5bed416..5a65b1a5e05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -61,7 +61,7 @@ public class PipeStatementExceptionVisitor extends
StatementVisitor<TSStatus, Ex
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
- return super.visitLoadFile(loadTsFileStatement, context);
+ return visitStatement(loadTsFileStatement, context);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index be4ee14ff02..812f8d29151 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -56,6 +57,20 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
return context;
}
+ @Override
+ public TSStatus visitLoadFile(
+ final LoadTsFileStatement loadTsFileStatement, final TSStatus context) {
+ if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+ || context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
+ && context.getMessage() != null
+ && context.getMessage().contains("memory")) {
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+ return super.visitLoadFile(loadTsFileStatement, context);
+ }
+
@Override
public TSStatus visitInsertTablet(
final InsertTabletStatement insertTabletStatement, final TSStatus
context) {
@@ -82,7 +97,11 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
private TSStatus visitInsertBase(
final InsertBaseStatement insertBaseStatement, final TSStatus context) {
- if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+ if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()) {
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
index eff5f790bdb..f034a4627b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
@@ -64,12 +64,12 @@ public class LoadTsFileMemoryManager {
throw new LoadRuntimeOutOfMemoryException(
String.format(
- "forceAllocate: failed to allocate memory from query engine after
%d retries, "
- + "total query memory %s, available memory for load %s bytes, "
- + "used memory size %d bytes, requested memory size %d bytes",
+ "forceAllocate: failed to allocate memory from query engine after
%s retries, "
+ + "total query memory %s bytes, current available memory for
load %s bytes, "
+ + "current load used memory size %s bytes, load requested
memory size %s bytes",
MEMORY_ALLOCATE_MAX_RETRIES,
-
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(),
- LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(),
+ QUERY_ENGINE_MEMORY_MANAGER.getAllocateMemoryForOperators(),
+ QUERY_ENGINE_MEMORY_MANAGER.getFreeMemoryForLoadTsFile(),
usedMemorySizeInBytes.get(),
sizeInBytes));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 038514ee02f..84708e52143 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2825,12 +2825,20 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context) {
- LoadTsfileAnalyzer loadTsfileAnalyzer =
- new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher,
schemaFetcher);
- try {
+ try (final LoadTsfileAnalyzer loadTsfileAnalyzer =
+ new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher,
schemaFetcher)) {
return loadTsfileAnalyzer.analyzeFileByFile();
- } finally {
- loadTsfileAnalyzer.close();
+ } catch (final Exception e) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to execute load tsfile statement %s. Detail: %s",
+ loadTsFileStatement,
+ e.getMessage() == null ? e.getClass().getName() :
e.getMessage());
+ logger.warn(exceptionMessage, e);
+ final Analysis analysis = new Analysis();
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
+ return analysis;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 91ca66fc0d5..79e84b3400d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -82,7 +82,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.nio.BufferUnderflowException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -94,7 +93,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-public class LoadTsfileAnalyzer {
+public class LoadTsfileAnalyzer implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsfileAnalyzer.class);
@@ -139,9 +138,10 @@ public class LoadTsfileAnalyzer {
public Analysis analyzeFileByFile() {
context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+
// check if the system is read only
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
- Analysis analysis = new Analysis();
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY,
LoadReadOnlyException.MESSAGE));
@@ -171,25 +171,17 @@ public class LoadTsfileAnalyzer {
"Load - Analysis Stage: {}/{} tsfiles have been analyzed,
progress: {}%",
i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 /
tsfileNum));
}
- } catch (IllegalArgumentException e) {
- LOGGER.warn(
- "Parse file {} to resource error, this TsFile maybe empty.",
tsFile.getPath(), e);
- throw new SemanticException(
- String.format("TsFile %s is empty or incomplete.",
tsFile.getPath()));
} catch (AuthException e) {
return createFailAnalysisForAuthException(e);
- } catch (BufferUnderflowException e) {
- LOGGER.warn(
- "The file {} is not a valid tsfile. Please check the input file.",
tsFile.getPath(), e);
- throw new SemanticException(
- String.format(
- "The file %s is not a valid tsfile. Please check the input
file.",
- tsFile.getPath()));
} catch (Exception e) {
- LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e);
- throw new SemanticException(
+ final String exceptionMessage =
String.format(
- "Parse file %s to resource error, because %s",
tsFile.getPath(), e.getMessage()));
+ "The file %s is not a valid tsfile. Please check the input
file. Detail: %s",
+ tsFile.getPath(), e.getMessage() == null ?
e.getClass().getName() : e.getMessage());
+ LOGGER.warn(exceptionMessage, e);
+ analysis.setFinishQueryAfterAnalyze(true);
+
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
+ return analysis;
}
}
@@ -197,16 +189,31 @@ public class LoadTsfileAnalyzer {
schemaAutoCreatorAndVerifier.flush();
} catch (AuthException e) {
return createFailAnalysisForAuthException(e);
+ } catch (Exception e) {
+ final String exceptionMessage =
+ String.format(
+ "Auto create or verify schema error when executing statement %s.
Detail: %s.",
+ loadTsFileStatement,
+ e.getMessage() == null ? e.getClass().getName() :
e.getMessage());
+ LOGGER.warn(exceptionMessage, e);
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.LOAD_FILE_ERROR,
+ String.format(
+ "Auto create or verify schema error when executing statement
%s.",
+ loadTsFileStatement)));
+ return analysis;
}
LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
// data partition will be queried in the scheduler
- final Analysis analysis = new Analysis();
analysis.setStatement(loadTsFileStatement);
return analysis;
}
+ @Override
public void close() {
schemaAutoCreatorAndVerifier.close();
}
@@ -360,8 +367,7 @@ public class LoadTsfileAnalyzer {
* This can only be invoked after all timeseries in the current tsfile
have been processed.
* Otherwise, the isAligned status may be wrong.
*/
- public void flushAndClearDeviceIsAlignedCacheIfNecessary()
- throws SemanticException, AuthException {
+ public void flushAndClearDeviceIsAlignedCacheIfNecessary() throws
SemanticException {
// avoid OOM when loading a tsfile with too many timeseries
// or loading too many tsfiles at the same time
schemaCache.clearDeviceIsAlignedCacheIfNecessary();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 4789834f35e..4db63ed1d49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -52,12 +52,12 @@ public class LocalExecutionPlanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(LocalExecutionPlanner.class);
private static final long ALLOCATE_MEMORY_FOR_OPERATORS;
- private static final long MAX_REST_MEMORY_FOR_LOAD;
+ private static final long MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
static {
IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
ALLOCATE_MEMORY_FOR_OPERATORS = CONFIG.getAllocateMemoryForOperators();
- MAX_REST_MEMORY_FOR_LOAD =
+ MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD =
(long)
((ALLOCATE_MEMORY_FOR_OPERATORS) * (1.0 -
CONFIG.getMaxAllocateMemoryRatioForLoad()));
}
@@ -69,6 +69,10 @@ public class LocalExecutionPlanner {
return freeMemoryForOperators;
}
+ public long getFreeMemoryForLoadTsFile() {
+ return freeMemoryForOperators - MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
+ }
+
public static LocalExecutionPlanner getInstance() {
return InstanceHolder.INSTANCE;
}
@@ -211,7 +215,7 @@ public class LocalExecutionPlanner {
}
public synchronized boolean forceAllocateFreeMemoryForOperators(long
memoryInBytes) {
- if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) {
+ if (freeMemoryForOperators - memoryInBytes <=
MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD) {
return false;
} else {
freeMemoryForOperators -= memoryInBytes;
@@ -220,9 +224,9 @@ public class LocalExecutionPlanner {
}
public synchronized long tryAllocateFreeMemoryForOperators(long
memoryInBytes) {
- if (freeMemoryForOperators - memoryInBytes <= MAX_REST_MEMORY_FOR_LOAD) {
- long result = freeMemoryForOperators - MAX_REST_MEMORY_FOR_LOAD;
- freeMemoryForOperators = MAX_REST_MEMORY_FOR_LOAD;
+ if (freeMemoryForOperators - memoryInBytes <=
MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD) {
+ long result = freeMemoryForOperators -
MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
+ freeMemoryForOperators = MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD;
return result;
} else {
freeMemoryForOperators -= memoryInBytes;