This is an automated email from the ASF dual-hosted git repository. Wei-hao-Li pushed a commit to branch IoTDBLocal in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 794cc603cc3d06efa5fc7e66fa9f4b939d3b6451 Author: Weihao Li <[email protected]> AuthorDate: Wed Jun 24 11:48:15 2026 +0800 modify some Signed-off-by: Weihao Li <[email protected]> --- .../java/org/apache/iotdb/udf/api/IoTDBLocal.java | 5 +- .../relational/ColumnTransformerBuilder.java | 45 ++++--- .../calc/plan/planner/TableOperatorGenerator.java | 53 +++++---- .../udf/UserDefineScalarFunctionTransformer.java | 34 +++++- .../iotdb/db/protocol/session/SessionManager.java | 17 +++ .../protocol/thrift/impl/ClientRPCServiceImpl.java | 2 +- .../iotdb/db/queryengine/plan/Coordinator.java | 33 +----- .../planner/DataNodeTableOperatorGenerator.java | 40 ++----- .../db/queryengine/udf/InternalQueryExecutor.java | 129 ++++++++++----------- .../db/queryengine/udf/InternalQueryResult.java | 32 ++++- .../iotdb/db/queryengine/udf/IoTDBLocalImpl.java | 111 ++++++------------ .../udf/ScalarUdfExpressionDetector.java | 58 --------- .../iotdb/db/queryengine/udf/UDFResultSetImpl.java | 28 +++-- 13 files changed, 264 insertions(+), 323 deletions(-) diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java index 0c96f919fc1..b84cb45965a 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java @@ -65,9 +65,6 @@ public interface IoTDBLocal { /** Log at ERROR level with exception stack. */ void error(String msg, Throwable t); - /** - * Release internal session and other resources. Called by the framework after beforeDestroy - * method. - */ + /** Close internal session. Called by the framework after beforeDestroy method. */ void close(); } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java index 782222a1394..ba0464b0777 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java @@ -20,6 +20,7 @@ package org.apache.iotdb.calc.execution.relational; import org.apache.iotdb.calc.i18n.CalcMessages; +import org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFactory; import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.calc.plan.relational.metadata.ITypeMetadata; import org.apache.iotdb.calc.transformation.dag.column.ColumnTransformer; @@ -198,7 +199,6 @@ import org.apache.iotdb.commons.queryengine.plan.relational.type.TypeNotFoundExc import org.apache.iotdb.commons.queryengine.plan.udf.TableUDFUtils; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; -import org.apache.iotdb.udf.api.IoTDBLocal; import org.apache.iotdb.udf.api.customizer.analysis.ScalarFunctionAnalysis; import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments; import org.apache.iotdb.udf.api.relational.ScalarFunction; @@ -1490,16 +1490,10 @@ public class ColumnTransformerBuilder .collect(Collectors.toList()), Collections.emptyMap()); ScalarFunctionAnalysis analysis = scalarFunction.analyze(parameters); - Optional<IoTDBLocal> ioTDBLocal = context.getIoTDBLocal(); - if (ioTDBLocal.isPresent()) { - scalarFunction.beforeStart(parameters, ioTDBLocal.get()); - } else { - scalarFunction.beforeStart(parameters); - } Type returnType = UDFDataTypeTransformer.transformUDFDataTypeToReadType(analysis.getOutputDataType()); return new UserDefineScalarFunctionTransformer( - returnType, scalarFunction, childrenColumnTransformer, ioTDBLocal.orElse(null)); + returnType, scalarFunction, childrenColumnTransformer, parameters, context); } } throw new IllegalArgumentException( @@ -1960,7 +1954,11 @@ public class ColumnTransformerBuilder @SuppressWarnings("unused") private final Optional<MemoryReservationManager> memoryReservationManager; - private final Optional<IoTDBLocal> ioTDBLocal; + private final String fragmentInstanceId; + + private final String outerQueryId; + + @Nullable private final IoTDBLocalFactory ioTDBLocalFactory; public Context( SessionInfo sessionInfo, @@ -1986,7 +1984,9 @@ public class ColumnTransformerBuilder typeProvider, metadata, memoryReservationManager, - Optional.empty()); + null, + null, + null); } public Context( @@ -2001,7 +2001,9 @@ public class ColumnTransformerBuilder ITableTypeProvider typeProvider, ITypeMetadata metadata, @Nullable MemoryReservationManager memoryReservationManager, - Optional<IoTDBLocal> ioTDBLocal) { + String fragmentInstanceId, + String outerQueryId, + @Nullable IoTDBLocalFactory ioTDBLocalFactory) { this.sessionInfo = sessionInfo; this.leafList = leafList; this.inputLocations = inputLocations; @@ -2013,11 +2015,26 @@ public class ColumnTransformerBuilder this.typeProvider = typeProvider; this.metadata = metadata; this.memoryReservationManager = Optional.ofNullable(memoryReservationManager); - this.ioTDBLocal = ioTDBLocal; + this.fragmentInstanceId = fragmentInstanceId; + this.outerQueryId = outerQueryId; + this.ioTDBLocalFactory = ioTDBLocalFactory; + } + + public SessionInfo getSessionInfo() { + return sessionInfo; + } + + public String getFragmentInstanceId() { + return fragmentInstanceId; + } + + public String getOuterQueryId() { + return outerQueryId; } - public Optional<IoTDBLocal> getIoTDBLocal() { - return ioTDBLocal; + @Nullable + public IoTDBLocalFactory getIoTDBLocalFactory() { + return ioTDBLocalFactory; } public Type getType(SymbolReference symbolReference) { diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java index 8b7f6cb714b..6454bd38d47 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java @@ -314,30 +314,10 @@ public abstract class TableOperatorGenerator< Map<Symbol, List<InputLocation>> inputLocations, PlanNodeId planNodeId, C context) { - return constructFilterAndProjectOperator( - predicate, - inputOperator, - projectExpressions, - inputDataTypes, - inputLocations, - planNodeId, - context, - getIoTDBLocal(context, planNodeId)); - } - protected Optional<IoTDBLocal> getIoTDBLocal(C context, PlanNodeId planNodeId) { - return Optional.empty(); - } - - protected Operator constructFilterAndProjectOperator( - Optional<Expression> predicate, - Operator inputOperator, - Expression[] projectExpressions, - List<TSDataType> inputDataTypes, - Map<Symbol, List<InputLocation>> inputLocations, - PlanNodeId planNodeId, - C context, - Optional<IoTDBLocal> ioTDBLocal) { + String fragmentInstanceId = getFragmentInstanceId(context); + String outerQueryId = getQueryId(context); + IoTDBLocalFactory ioTDBLocalFactory = getIoTDBLocalFactory(context); final List<TSDataType> filterOutputDataTypes = new ArrayList<>(inputDataTypes); @@ -367,7 +347,9 @@ public abstract class TableOperatorGenerator< context.getTableTypeProvider(), metadata, context.getMemoryReservationManager(), - ioTDBLocal); + fragmentInstanceId, + outerQueryId, + ioTDBLocalFactory); return visitor.process(p, filterColumnTransformerContext); }) @@ -396,7 +378,9 @@ public abstract class TableOperatorGenerator< context.getTableTypeProvider(), metadata, context.getMemoryReservationManager(), - ioTDBLocal); + fragmentInstanceId, + outerQueryId, + ioTDBLocalFactory); for (Expression expression : projectExpressions) { projectOutputTransformerList.add( @@ -420,6 +404,18 @@ public abstract class TableOperatorGenerator< predicate.isPresent()); } + protected String getFragmentInstanceId(C context) { + return null; + } + + protected String getQueryId(C context) { + return null; + } + + protected IoTDBLocalFactory getIoTDBLocalFactory(C context) { + return null; + } + @Override public Operator visitProject(ProjectNode node, C context) { ITableTypeProvider typeProvider = context.getTableTypeProvider(); @@ -2494,4 +2490,11 @@ public abstract class TableOperatorGenerator< } protected abstract SessionInfo getSessionInfo(C context); + + /** Factory for creating {@link IoTDBLocal} inside UDF column transformers. */ + @FunctionalInterface + public interface IoTDBLocalFactory { + + IoTDBLocal create(SessionInfo sessionInfo, String fragmentInstanceId, String queryId); + } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java index dd7c175774d..ab2f99031d2 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java @@ -20,9 +20,13 @@ package org.apache.iotdb.calc.transformation.dag.column.udf; import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.RecordIterator; +import org.apache.iotdb.calc.execution.relational.ColumnTransformerBuilder; +import org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFactory; import org.apache.iotdb.calc.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.calc.transformation.dag.column.multi.MultiColumnTransformer; import org.apache.iotdb.udf.api.IoTDBLocal; +import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments; +import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.ScalarFunction; import org.apache.iotdb.udf.api.relational.access.Record; @@ -43,12 +47,36 @@ public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer Type returnType, ScalarFunction scalarFunction, List<ColumnTransformer> childrenTransformers, - IoTDBLocal ioTDBLocal) { + FunctionArguments parameters, + ColumnTransformerBuilder.Context context) { super(returnType, childrenTransformers); this.scalarFunction = scalarFunction; - this.ioTDBLocal = ioTDBLocal; + this.ioTDBLocal = createIoTDBLocal(context); this.inputTypes = childrenTransformers.stream().map(ColumnTransformer::getType).collect(Collectors.toList()); + try { + if (ioTDBLocal != null) { + scalarFunction.beforeStart(parameters, ioTDBLocal); + } else { + scalarFunction.beforeStart(parameters); + } + } catch (UDFException e) { + throw new RuntimeException( + "Error occurs when starting user-defined scalar function " + + scalarFunction.getClass().getName(), + e); + } + } + + private static IoTDBLocal createIoTDBLocal(ColumnTransformerBuilder.Context context) { + IoTDBLocalFactory factory = context.getIoTDBLocalFactory(); + if (factory == null + || context.getFragmentInstanceId() == null + || context.getOuterQueryId() == null) { + return null; + } + return factory.create( + context.getSessionInfo(), context.getFragmentInstanceId(), context.getOuterQueryId()); } @Override @@ -109,8 +137,8 @@ public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer public void close() { super.close(); if (ioTDBLocal != null) { - scalarFunction.beforeDestroy(ioTDBLocal); ioTDBLocal.close(); + scalarFunction.beforeDestroy(ioTDBLocal); } else { scalarFunction.beforeDestroy(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index b3fa3c049f3..04ddf7f4b9f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -369,6 +369,23 @@ public class SessionManager implements SessionManagerMBean { return currSession.get(); } + /** + * Swap the ThreadLocal session and return the previous one. Used by UDF internal queries to + * temporarily install an internal session without removing the previous session from the session + * map. + */ + public IClientSession exchangeCurrSession(IClientSession session) { + IClientSession previous = currSession.get(); + if (session != null) { + currSession.set(session); + sessions.putIfAbsent(session, placeHolder); + } else { + currSession.remove(); + currSessionIdleTime.remove(); + } + return previous; + } + /** get current session and update session idle time. */ public IClientSession getCurrSessionAndUpdateIdleTime() { IClientSession clientSession = getCurrSession(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 1aa14e053ea..d940d226096 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -562,7 +562,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } - private static void clearUp( + public static void clearUp( IClientSession clientSession, Long statementId, Long queryId, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 45b8643d02c..402424d1f69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; @@ -29,7 +30,6 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.memory.IMemoryBlock; import org.apache.iotdb.commons.memory.MemoryBlockType; @@ -318,8 +318,7 @@ public class Coordinator { boolean userQuery, boolean debug, boolean readOnlyInternalQuery, - BiFunction<MPPQueryContext, Long, IQueryExecution> iQueryExecutionFactory) - throws IoTDBException { + BiFunction<MPPQueryContext, Long, IQueryExecution> iQueryExecutionFactory) { long startTime = System.currentTimeMillis(); QueryId globalQueryId = queryIdGenerator.createNextQueryId(); MPPQueryContext queryContext = null; @@ -504,40 +503,12 @@ public class Coordinator { long timeOut, boolean userQuery, boolean debug) { - return executeForTableModel( - statement, - sqlParser, - clientSession, - queryId, - session, - sql, - metadata, - timeOut, - userQuery, - debug, - false); - } - - public ExecutionResult executeForTableModel( - org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement statement, - SqlParser sqlParser, - IClientSession clientSession, - long queryId, - SessionInfo session, - String sql, - Metadata metadata, - long timeOut, - boolean userQuery, - boolean debug, - boolean readOnlyInternalQuery) - throws IoTDBException { return execution( queryId, session, sql, userQuery, debug, - readOnlyInternalQuery, ((queryContext, startTime) -> createQueryExecutionForTableModel( statement, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index 9a2e35a8c1e..97383309a48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -28,6 +28,7 @@ import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.La import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAggregator; import org.apache.iotdb.calc.execution.relational.ColumnTransformerBuilder; import org.apache.iotdb.calc.plan.planner.TableOperatorGenerator; +import org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFactory; import org.apache.iotdb.calc.transformation.dag.column.leaf.LeafColumnTransformer; import org.apache.iotdb.calc.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer; import org.apache.iotdb.common.rpc.thrift.TEndPoint; @@ -68,6 +69,7 @@ import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle; import org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle; import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager; +import org.apache.iotdb.db.queryengine.execution.operator.EmptyDataOperator; import org.apache.iotdb.db.queryengine.execution.operator.ExplainAnalyzeOperator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.TableIntoOperator; @@ -132,14 +134,13 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Table import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.udf.IoTDBLocalImpl; -import org.apache.iotdb.db.queryengine.udf.ScalarUdfExpressionDetector; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils; -import org.apache.iotdb.udf.api.IoTDBLocal; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.idcolumn.FourOrHigherLevelDBExtractor; @@ -2104,34 +2105,17 @@ public class DataNodeTableOperatorGenerator } @Override - protected Optional<IoTDBLocal> getIoTDBLocal( - LocalExecutionPlanContext context, PlanNodeId planNodeId) { - return Optional.of( - IoTDBLocalImpl.create( - getSessionInfo(context), context.getFragmentInstanceId(), planNodeId)); + protected String getFragmentInstanceId(LocalExecutionPlanContext context) { + return context.getFragmentInstanceId().getFullId(); } @Override - protected Operator constructFilterAndProjectOperator( - Optional<Expression> predicate, - Operator inputOperator, - Expression[] projectExpressions, - List<TSDataType> inputDataTypes, - Map<Symbol, List<InputLocation>> inputLocations, - PlanNodeId planNodeId, - LocalExecutionPlanContext context) { - Optional<IoTDBLocal> ioTDBLocal = - ScalarUdfExpressionDetector.contains(predicate, projectExpressions) - ? getIoTDBLocal(context, planNodeId) - : Optional.empty(); - return constructFilterAndProjectOperator( - predicate, - inputOperator, - projectExpressions, - inputDataTypes, - inputLocations, - planNodeId, - context, - ioTDBLocal); + protected String getQueryId(LocalExecutionPlanContext context) { + return context.getFragmentInstanceId().getQueryId().getId(); + } + + @Override + protected IoTDBLocalFactory getIoTDBLocalFactory(LocalExecutionPlanContext context) { + return IoTDBLocalImpl.FACTORY; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java index 5a31c5910f7..328542d05c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java @@ -20,12 +20,14 @@ package org.apache.iotdb.db.queryengine.udf; import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.commons.exception.QueryTimeoutException; import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.queryengine.common.SessionInfo; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement; import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl; +import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; @@ -45,82 +47,79 @@ public final class InternalQueryExecutor { private InternalQueryExecutor() {} - public static long computeRemainingTimeoutMs( - long outerQueryStartTimeMs, long outerQueryTimeoutMs) { - return outerQueryTimeoutMs - (System.currentTimeMillis() - outerQueryStartTimeMs); - } - public static InternalQueryResult executeInternalQuery( - IClientSession internalSession, SessionInfo sessionInfo, + String fragmentInstanceId, + QueryId outerQueryId, String sql, - long outerQueryStartTimeMs, - long outerQueryTimeoutMs) + long timeoutMs) throws IoTDBException { - long timeoutMs = computeRemainingTimeoutMs(outerQueryStartTimeMs, outerQueryTimeoutMs); - if (timeoutMs <= 0) { - throw new QueryTimeoutException( - "Outer query timeout exceeded before UDF internal query starts"); - } + IClientSession previousSession = SESSION_MANAGER.getCurrSession(); - Statement parsedStatement = parseTableStatement(internalSession, sessionInfo, sql); - - long statementId = SESSION_MANAGER.requestStatementId(internalSession); - long queryId = SESSION_MANAGER.requestQueryId(internalSession, statementId); - - ExecutionResult result = - COORDINATOR.executeForTableModel( - parsedStatement, - RELATION_SQL_PARSER, - internalSession, - queryId, - sessionInfo, - sql, - METADATA, - timeoutMs, - false, - false, - true); - - if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null); - internalSession.removeQueryId(statementId, queryId); - throw new IoTDBException(result.status.message, result.status.code); - } + InternalClientSession internalSession = + new InternalClientSession(formatInternalClientId(fragmentInstanceId, outerQueryId)); + internalSession.setSqlDialect(sessionInfo.getSqlDialect()); + sessionInfo.getDatabaseName().ifPresent(internalSession::setDatabaseName); - IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); - if (queryExecution == null) { - COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null); - internalSession.removeQueryId(statementId, queryId); - throw new IoTDBException( - "Internal query execution not found", TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); - } + SESSION_MANAGER.supplySession( + internalSession, + sessionInfo.getUserId(), + sessionInfo.getUserName(), + sessionInfo.getZoneId(), + sessionInfo.getVersion()); - return new InternalQueryResult( - queryExecution, - () -> { - COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null); - internalSession.removeQueryId(statementId, queryId); - }); - } + long statementId = -1; + long queryId = -1; + try { + SESSION_MANAGER.exchangeCurrSession(internalSession); - static void validateReadOnlyQuery(IQueryExecution execution) throws IoTDBException { - if (execution.getQueryType() != QueryType.READ) { - execution.stopAndCleanup(null); - throw new SemanticException("Only query is allowed when used IoTDBLocal in UDF"); - } - } + statementId = SESSION_MANAGER.requestStatementId(internalSession); + queryId = SESSION_MANAGER.requestQueryId(internalSession, statementId); - private static Statement parseTableStatement( - IClientSession internalSession, SessionInfo sessionInfo, String sql) throws IoTDBException { - try { - Statement statement = + Statement parsedStatement = RELATION_SQL_PARSER.createStatement(sql, sessionInfo.getZoneId(), internalSession); - return statement; + + ExecutionResult result = + COORDINATOR.executeForTableModel( + parsedStatement, + RELATION_SQL_PARSER, + internalSession, + queryId, + sessionInfo, + sql, + METADATA, + timeoutMs, + false, + false); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new IoTDBException(result.status.message, result.status.code); + } + + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + if (queryExecution == null) { + throw new IoTDBException( + "Internal query execution not found", + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } + + return new InternalQueryResult(queryExecution, internalSession, statementId, queryId, sql); } catch (Exception e) { - throw new IoTDBException(e.getMessage(), TSStatusCode.SQL_PARSE_ERROR.getStatusCode()); + ClientRPCServiceImpl.clearUp(internalSession, statementId, queryId, () -> sql, e); + throw new IoTDBException(e.getMessage(), TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } finally { + SESSION_MANAGER.exchangeCurrSession(previousSession); + } + } + + static String formatInternalClientId(String fragmentInstanceId, QueryId outerQueryId) { + return String.format("udf-local-%s-%s", fragmentInstanceId, outerQueryId); + } + + public static void validateReadOnlyQuery(IQueryExecution execution) { + if (execution.getQueryType() != QueryType.READ) { + throw new SemanticException("Only query is supported for IoTDBLocal query interface"); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java index 9f35fa95618..419ea00eb00 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java @@ -19,25 +19,47 @@ package org.apache.iotdb.db.queryengine.udf; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; -/** Internal query result holding {@link IQueryExecution} and a cleanup action. */ +/** Internal query result holding {@link IQueryExecution} and cleanup metadata. */ public final class InternalQueryResult implements AutoCloseable { private final IQueryExecution queryExecution; - private final Runnable releaseAction; + private final IClientSession internalSession; + private final long statementId; + private final long queryId; + private final String sql; - public InternalQueryResult(IQueryExecution queryExecution, Runnable releaseAction) { + public InternalQueryResult( + IQueryExecution queryExecution, + IClientSession internalSession, + long statementId, + long queryId, + String sql) { this.queryExecution = queryExecution; - this.releaseAction = releaseAction; + this.internalSession = internalSession; + this.statementId = statementId; + this.queryId = queryId; + this.sql = sql; } public IQueryExecution getQueryExecution() { return queryExecution; } + public long getQueryId() { + return queryId; + } + + public DatasetHeader getDatasetHeader() { + return queryExecution.getDatasetHeader(); + } + @Override public void close() { - releaseAction.run(); + ClientRPCServiceImpl.clearUp(internalSession, statementId, queryId, () -> sql, null); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java index 32540b20e06..47ee7d07f27 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java @@ -19,13 +19,12 @@ package org.apache.iotdb.db.queryengine.udf; -import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; +import org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFactory; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.QueryTimeoutException; import org.apache.iotdb.commons.queryengine.common.SessionInfo; -import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.protocol.session.InternalClientSession; -import org.apache.iotdb.db.protocol.session.SessionManager; -import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.udf.api.IoTDBLocal; @@ -41,74 +40,35 @@ import java.util.List; /** Server-side implementation of {@link IoTDBLocal}. */ public class IoTDBLocalImpl implements IoTDBLocal { + public static final IoTDBLocalFactory FACTORY = IoTDBLocalImpl::new; + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLocalImpl.class); - private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); private static final Coordinator COORDINATOR = Coordinator.getInstance(); private final SessionInfo sessionInfo; - private final InternalClientSession internalSession; - private final long outerQueryStartTimeMs; - private final long outerQueryTimeoutMs; + private final String fragmentInstanceId; + private final QueryId outerQueryId; private final List<UDFResultSetImpl> openResultSets = new ArrayList<>(); - private boolean closed; - public IoTDBLocalImpl( - SessionInfo sessionInfo, - String internalClientId, - long outerQueryStartTimeMs, - long outerQueryTimeoutMs) { + public IoTDBLocalImpl(SessionInfo sessionInfo, String fragmentInstanceId, String outerQueryId) { this.sessionInfo = sessionInfo; - this.outerQueryStartTimeMs = outerQueryStartTimeMs; - this.outerQueryTimeoutMs = outerQueryTimeoutMs; - this.internalSession = new InternalClientSession(internalClientId); - internalSession.setSqlDialect(sessionInfo.getSqlDialect()); - sessionInfo.getDatabaseName().ifPresent(internalSession::setDatabaseName); - SESSION_MANAGER.supplySession( - internalSession, - sessionInfo.getUserId(), - sessionInfo.getUserName(), - sessionInfo.getZoneId(), - ClientVersion.V_1_0); - } - - public static String formatInternalClientId( - FragmentInstanceId fragmentInstanceId, PlanNodeId planNodeId) { - return "udf-local-" + fragmentInstanceId + "-" + planNodeId; - } - - public static IoTDBLocalImpl create( - SessionInfo sessionInfo, FragmentInstanceId fragmentInstanceId, PlanNodeId planNodeId) { - long outerStart = System.currentTimeMillis(); - long outerTimeout = - org.apache.iotdb.db.conf.IoTDBDescriptor.getInstance() - .getConfig() - .getQueryTimeoutThreshold(); - String globalQueryId = fragmentInstanceId.getQueryId().getId(); - for (IQueryExecution execution : COORDINATOR.getAllQueryExecutions()) { - if (globalQueryId.equals(execution.getQueryId())) { - outerStart = execution.getStartExecutionTime(); - outerTimeout = execution.getTimeout(); - break; - } - } - return new IoTDBLocalImpl( - sessionInfo, - formatInternalClientId(fragmentInstanceId, planNodeId), - outerStart, - outerTimeout); + this.fragmentInstanceId = fragmentInstanceId; + this.outerQueryId = QueryId.valueOf(outerQueryId); } @Override public UDFResultSet query(String sql) throws UDFException { - if (closed) { - throw new UDFException("IoTDBLocal is already closed"); - } try { + long timeoutMs = computeRemainingTimeoutMs(); + if (timeoutMs <= 0) { + throw new QueryTimeoutException( + "Outer query timeout exceeded before IoTDBLocal query starts"); + } InternalQueryResult result = InternalQueryExecutor.executeInternalQuery( - internalSession, sessionInfo, sql, outerQueryStartTimeMs, outerQueryTimeoutMs); + sessionInfo, fragmentInstanceId, outerQueryId, sql, timeoutMs); int index = openResultSets.size(); - UDFResultSetImpl rs = new UDFResultSetImpl(result, this, index); + UDFResultSetImpl rs = new UDFResultSetImpl(openResultSets, index, result); openResultSets.add(rs); return rs; } catch (IoTDBException e) { @@ -116,29 +76,32 @@ public class IoTDBLocalImpl implements IoTDBLocal { } } - void markResultSetClosed(int index) { - if (index >= 0 && index < openResultSets.size()) { - openResultSets.set(index, null); - } - } - - public void closeAllResultSets() { - for (UDFResultSetImpl rs : openResultSets) { + @Override + public void close() { + for (int i = 0; i < openResultSets.size(); i++) { + UDFResultSetImpl rs = openResultSets.get(i); if (rs != null) { - rs.close(); + try { + rs.close(); + } catch (UDFException e) { + LOGGER.warn("Failed to close UDF result set at index {}", i, e); + } } } openResultSets.clear(); } - @Override - public void close() { - if (closed) { - return; + private long computeRemainingTimeoutMs() { + long outerStart = System.currentTimeMillis(); + long outerTimeout = IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(); + for (IQueryExecution execution : COORDINATOR.getAllQueryExecutions()) { + if (outerQueryId.getId().equals(execution.getQueryId())) { + outerStart = execution.getStartExecutionTime(); + outerTimeout = execution.getTimeout(); + break; + } } - closed = true; - closeAllResultSets(); - SESSION_MANAGER.closeSession(internalSession, COORDINATOR::cleanupQueryExecution); + return outerTimeout - (System.currentTimeMillis() - outerStart); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java deleted file mode 100644 index cf87bee846a..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.udf; - -import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; -import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.FunctionCall; -import org.apache.iotdb.commons.queryengine.plan.udf.TableUDFUtils; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultTraversalVisitor; - -import java.util.Optional; - -/** Detects whether filter or project expressions contain a user-defined scalar function. */ -public final class ScalarUdfExpressionDetector extends DefaultTraversalVisitor<Void> { - - private boolean found; - - private ScalarUdfExpressionDetector() {} - - public static boolean contains(Optional<Expression> predicate, Expression[] projectExpressions) { - ScalarUdfExpressionDetector detector = new ScalarUdfExpressionDetector(); - if (predicate.isPresent()) { - detector.process(predicate.get(), null); - } - for (Expression expression : projectExpressions) { - if (detector.found) { - return true; - } - detector.process(expression, null); - } - return detector.found; - } - - @Override - public Void visitFunctionCall(FunctionCall node, Void context) { - if (TableUDFUtils.isScalarFunction(node.getName().getSuffix())) { - found = true; - return null; - } - return super.visitFunctionCall(node, context); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java index 647e0115a64..bc3f163eee8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java @@ -23,7 +23,6 @@ import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.Re import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; -import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.udf.api.UDFResultSet; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.access.Record; @@ -41,19 +40,20 @@ import java.util.stream.Collectors; /** Server-side implementation of {@link UDFResultSet}. */ public class UDFResultSetImpl implements UDFResultSet { - private final InternalQueryResult queryResult; - private final IoTDBLocalImpl owner; + private final List<UDFResultSetImpl> openResultSets; private final int index; + private final InternalQueryResult queryResult; + private final List<Type> columnTypes; private Iterator<Record> rowIterator; - private final List<Type> columnTypes; private boolean closed; - public UDFResultSetImpl(InternalQueryResult queryResult, IoTDBLocalImpl owner, int index) { - this.queryResult = queryResult; - this.owner = owner; + public UDFResultSetImpl( + List<UDFResultSetImpl> openResultSets, int index, InternalQueryResult queryResult) { + this.openResultSets = openResultSets; this.index = index; - this.columnTypes = buildColumnTypes(queryResult.getQueryExecution()); + this.queryResult = queryResult; + this.columnTypes = buildColumnTypes(queryResult.getDatasetHeader().getColumnHeaders()); } @Override @@ -68,7 +68,7 @@ public class UDFResultSetImpl implements UDFResultSet { try { batch = queryResult.getQueryExecution().getBatchResult(); } catch (IoTDBException e) { - throw new UDFException(e.getMessage()); + throw new UDFException(e.getMessage(), e); } if (!batch.isPresent()) { return false; @@ -85,9 +85,7 @@ public class UDFResultSetImpl implements UDFResultSet { @Override public Record next() throws UDFException { - ensureOpen(); - - if (rowIterator == null || !rowIterator.hasNext()) { + if (!hasNext()) { throw new NoSuchElementException(); } return rowIterator.next(); @@ -99,12 +97,12 @@ public class UDFResultSetImpl implements UDFResultSet { return; } closed = true; + openResultSets.set(index, null); try { queryResult.close(); } catch (RuntimeException e) { throw new UDFException("Failed to close internal query result", e); } - owner.markResultSetClosed(index); } private void ensureOpen() throws UDFException { @@ -113,8 +111,8 @@ public class UDFResultSetImpl implements UDFResultSet { } } - private static List<Type> buildColumnTypes(IQueryExecution queryExecution) { - return queryExecution.getDatasetHeader().getColumnHeaders().stream() + private static List<Type> buildColumnTypes(List<ColumnHeader> columnHeaders) { + return columnHeaders.stream() .map(ColumnHeader::getColumnType) .map(UDFDataTypeTransformer::transformToUDFDataType) .map(UDFDataTypeTransformer::transformUDFDataTypeToReadType)
