This is an automated email from the ASF dual-hosted git repository. Wei-hao-Li pushed a commit to branch IoTDBLocal in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fd0846f313925c050e7d2cf78fd9ca1d4b7ece08 Author: Weihao Li <[email protected]> AuthorDate: Wed Jun 24 02:19:53 2026 +0800 draft Signed-off-by: Weihao Li <[email protected]> --- .../java/org/apache/iotdb/udf/api/IoTDBLocal.java | 73 ++++++++ .../org/apache/iotdb/udf/api/UDFResultSet.java | 45 +++++ .../udf/api/relational/AggregateFunction.java | 9 + .../iotdb/udf/api/relational/ScalarFunction.java | 19 +++ .../relational/ColumnTransformerBuilder.java | 45 ++++- .../calc/plan/planner/TableOperatorGenerator.java | 31 +++- .../udf/UserDefineScalarFunctionTransformer.java | 23 ++- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 4 +- .../iotdb/db/queryengine/plan/Coordinator.java | 46 ++++- .../planner/DataNodeTableOperatorGenerator.java | 37 +++- .../db/queryengine/udf/InternalQueryExecutor.java | 126 ++++++++++++++ .../db/queryengine/udf/InternalQueryResult.java | 43 +++++ .../iotdb/db/queryengine/udf/IoTDBLocalImpl.java | 188 +++++++++++++++++++++ .../udf/ScalarUdfExpressionDetector.java | 58 +++++++ .../iotdb/db/queryengine/udf/UDFResultSetImpl.java | 123 ++++++++++++++ 15 files changed, 857 insertions(+), 13 deletions(-) diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java new file mode 100644 index 00000000000..0c96f919fc1 --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api; + +import org.apache.iotdb.udf.api.exception.UDFException; + +/** + * Entry point for UDF-embedded SQL queries and logging, injected by the framework during UDF + * lifecycle. Reuses the outer query caller's user identity and permissions without extra client + * dependencies. + */ +public interface IoTDBLocal { + + /** + * Execute a single read-only table-model SQL statement and return a streaming result set. + * + * @param sql table-model read SQL (e.g. SELECT, SHOW CONFIGNODES) + * @return query result set; call {@link UDFResultSet#close()} when done or rely on framework + * cleanup + * @throws UDFException on parse failure, permission denied, or execution error + */ + UDFResultSet query(String sql) throws UDFException; + + /** Log at INFO level. */ + void info(String msg); + + /** Log at INFO level with formatting. */ + void info(String format, Object... args); + + /** Log at INFO level with exception stack. */ + void info(String msg, Throwable t); + + /** Log at WARN level. */ + void warn(String msg); + + /** Log at WARN level with formatting. */ + void warn(String format, Object... args); + + /** Log at WARN level with exception stack. */ + void warn(String msg, Throwable t); + + /** Log at ERROR level. */ + void error(String msg); + + /** Log at ERROR level with formatting. */ + void error(String format, Object... args); + + /** Log at ERROR level with exception stack. */ + void error(String msg, Throwable t); + + /** + * Release internal session and other resources. Called by the framework after beforeDestroy + * method. + */ + void close(); +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/UDFResultSet.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/UDFResultSet.java new file mode 100644 index 00000000000..ed6d13442bf --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/UDFResultSet.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api; + +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.access.Record; + +/** Streaming result set returned by {@link IoTDBLocal#query(String)}. */ +public interface UDFResultSet extends AutoCloseable { + + /** + * @return {@code true} if another row is available + * @throws UDFException if underlying read fails + */ + boolean hasNext() throws UDFException; + + /** + * @return the next row + * @throws UDFException if underlying read fails + * @throws java.util.NoSuchElementException if no more rows (consistent with {@link + * java.util.Iterator}) + */ + Record next() throws UDFException; + + /** Release query resources. Repeated calls must be idempotent. */ + @Override + void close() throws UDFException; +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java index 6c9d385ac24..67fdb0aa2b1 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java @@ -19,6 +19,7 @@ package org.apache.iotdb.udf.api.relational; +import org.apache.iotdb.udf.api.IoTDBLocal; import org.apache.iotdb.udf.api.State; import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis; import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments; @@ -58,6 +59,10 @@ public interface AggregateFunction extends SQLFunction { // do nothing } + default void beforeStart(FunctionArguments arguments, IoTDBLocal local) throws UDFException { + beforeStart(arguments); + } + /** Create and initialize state. You may bind some resource in this method. */ State createState(); @@ -101,4 +106,8 @@ public interface AggregateFunction extends SQLFunction { default void beforeDestroy() { // do nothing } + + default void beforeDestroy(IoTDBLocal local) { + beforeDestroy(); + } } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java index 68d7793093d..e1daab6e4b0 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java @@ -19,6 +19,7 @@ package org.apache.iotdb.udf.api.relational; +import org.apache.iotdb.udf.api.IoTDBLocal; import org.apache.iotdb.udf.api.customizer.analysis.ScalarFunctionAnalysis; import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments; import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; @@ -54,6 +55,14 @@ public interface ScalarFunction extends SQLFunction { // do nothing } + /** + * Same as {@link #beforeStart(FunctionArguments)} with access to {@link IoTDBLocal} for embedded + * queries. + */ + default void beforeStart(FunctionArguments arguments, IoTDBLocal local) throws UDFException { + beforeStart(arguments); + } + /** * This method will be called to process the transformation. In a single UDF query, this method * may be called multiple times. @@ -63,8 +72,18 @@ public interface ScalarFunction extends SQLFunction { */ Object evaluate(Record input) throws UDFException; + /** Same as {@link #evaluate(Record)} with access to {@link IoTDBLocal} for embedded queries. */ + default Object evaluate(Record input, IoTDBLocal local) throws UDFException { + return evaluate(input); + } + /** This method is mainly used to release the resources used in the ScalarFunction. */ default void beforeDestroy() { // do nothing } + + /** Same as {@link #beforeDestroy()} with access to {@link IoTDBLocal}. */ + default void beforeDestroy(IoTDBLocal local) { + beforeDestroy(); + } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java index a25188361af..782222a1394 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java @@ -198,6 +198,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.type.TypeNotFoundExc import org.apache.iotdb.commons.queryengine.plan.udf.TableUDFUtils; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.udf.api.IoTDBLocal; import org.apache.iotdb.udf.api.customizer.analysis.ScalarFunctionAnalysis; import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments; import org.apache.iotdb.udf.api.relational.ScalarFunction; @@ -1489,11 +1490,16 @@ public class ColumnTransformerBuilder .collect(Collectors.toList()), Collections.emptyMap()); ScalarFunctionAnalysis analysis = scalarFunction.analyze(parameters); - scalarFunction.beforeStart(parameters); + Optional<IoTDBLocal> ioTDBLocal = context.getIoTDBLocal(); + if (ioTDBLocal.isPresent()) { + scalarFunction.beforeStart(parameters, ioTDBLocal.get()); + } else { + scalarFunction.beforeStart(parameters); + } Type returnType = UDFDataTypeTransformer.transformUDFDataTypeToReadType(analysis.getOutputDataType()); return new UserDefineScalarFunctionTransformer( - returnType, scalarFunction, childrenColumnTransformer); + returnType, scalarFunction, childrenColumnTransformer, ioTDBLocal.orElse(null)); } } throw new IllegalArgumentException( @@ -1954,6 +1960,8 @@ public class ColumnTransformerBuilder @SuppressWarnings("unused") private final Optional<MemoryReservationManager> memoryReservationManager; + private final Optional<IoTDBLocal> ioTDBLocal; + public Context( SessionInfo sessionInfo, List<LeafColumnTransformer> leafList, @@ -1966,6 +1974,34 @@ public class ColumnTransformerBuilder ITableTypeProvider typeProvider, ITypeMetadata metadata, @Nullable MemoryReservationManager memoryReservationManager) { + this( + sessionInfo, + leafList, + inputLocations, + cache, + hasSeen, + commonTransformerList, + inputDataTypes, + originSize, + typeProvider, + metadata, + memoryReservationManager, + Optional.empty()); + } + + public Context( + SessionInfo sessionInfo, + List<LeafColumnTransformer> leafList, + Map<Symbol, List<InputLocation>> inputLocations, + Map<Expression, ColumnTransformer> cache, + Map<Expression, ColumnTransformer> hasSeen, + List<ColumnTransformer> commonTransformerList, + List<TSDataType> inputDataTypes, + int originSize, + ITableTypeProvider typeProvider, + ITypeMetadata metadata, + @Nullable MemoryReservationManager memoryReservationManager, + Optional<IoTDBLocal> ioTDBLocal) { this.sessionInfo = sessionInfo; this.leafList = leafList; this.inputLocations = inputLocations; @@ -1977,6 +2013,11 @@ public class ColumnTransformerBuilder this.typeProvider = typeProvider; this.metadata = metadata; this.memoryReservationManager = Optional.ofNullable(memoryReservationManager); + this.ioTDBLocal = ioTDBLocal; + } + + public Optional<IoTDBLocal> getIoTDBLocal() { + return ioTDBLocal; } public Type getType(SymbolReference symbolReference) { diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java index bfac196461e..8b7f6cb714b 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java @@ -169,6 +169,7 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Literal; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager; +import org.apache.iotdb.udf.api.IoTDBLocal; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; @@ -313,6 +314,30 @@ public abstract class TableOperatorGenerator< Map<Symbol, List<InputLocation>> inputLocations, PlanNodeId planNodeId, C context) { + return constructFilterAndProjectOperator( + predicate, + inputOperator, + projectExpressions, + inputDataTypes, + inputLocations, + planNodeId, + context, + getIoTDBLocal(context, planNodeId)); + } + + protected Optional<IoTDBLocal> getIoTDBLocal(C context, PlanNodeId planNodeId) { + return Optional.empty(); + } + + protected Operator constructFilterAndProjectOperator( + Optional<Expression> predicate, + Operator inputOperator, + Expression[] projectExpressions, + List<TSDataType> inputDataTypes, + Map<Symbol, List<InputLocation>> inputLocations, + PlanNodeId planNodeId, + C context, + Optional<IoTDBLocal> ioTDBLocal) { final List<TSDataType> filterOutputDataTypes = new ArrayList<>(inputDataTypes); @@ -341,7 +366,8 @@ public abstract class TableOperatorGenerator< 0, context.getTableTypeProvider(), metadata, - context.getMemoryReservationManager()); + context.getMemoryReservationManager(), + ioTDBLocal); return visitor.process(p, filterColumnTransformerContext); }) @@ -369,7 +395,8 @@ public abstract class TableOperatorGenerator< inputLocations.size(), context.getTableTypeProvider(), metadata, - context.getMemoryReservationManager()); + context.getMemoryReservationManager(), + ioTDBLocal); for (Expression expression : projectExpressions) { projectOutputTransformerList.add( diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java index 4e22315b628..dd7c175774d 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java @@ -22,6 +22,7 @@ package org.apache.iotdb.calc.transformation.dag.column.udf; import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.RecordIterator; import org.apache.iotdb.calc.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.calc.transformation.dag.column.multi.MultiColumnTransformer; +import org.apache.iotdb.udf.api.IoTDBLocal; import org.apache.iotdb.udf.api.relational.ScalarFunction; import org.apache.iotdb.udf.api.relational.access.Record; @@ -36,13 +37,16 @@ public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer private final ScalarFunction scalarFunction; private final List<Type> inputTypes; + private final IoTDBLocal ioTDBLocal; public UserDefineScalarFunctionTransformer( Type returnType, ScalarFunction scalarFunction, - List<ColumnTransformer> childrenTransformers) { + List<ColumnTransformer> childrenTransformers, + IoTDBLocal ioTDBLocal) { super(returnType, childrenTransformers); this.scalarFunction = scalarFunction; + this.ioTDBLocal = ioTDBLocal; this.inputTypes = childrenTransformers.stream().map(ColumnTransformer::getType).collect(Collectors.toList()); } @@ -53,7 +57,10 @@ public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer RecordIterator iterator = new RecordIterator(childrenColumns, inputTypes, positionCount); while (iterator.hasNext()) { try { - Object result = scalarFunction.evaluate(iterator.next()); + Object result = + ioTDBLocal != null + ? scalarFunction.evaluate(iterator.next(), ioTDBLocal) + : scalarFunction.evaluate(iterator.next()); if (result == null) { builder.appendNull(); } else { @@ -80,7 +87,10 @@ public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer builder.appendNull(); continue; } - Object result = scalarFunction.evaluate(input); + Object result = + ioTDBLocal != null + ? scalarFunction.evaluate(input, ioTDBLocal) + : scalarFunction.evaluate(input); if (result == null) { builder.appendNull(); } else { @@ -98,7 +108,12 @@ public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer @Override public void close() { super.close(); - scalarFunction.beforeDestroy(); + if (ioTDBLocal != null) { + scalarFunction.beforeDestroy(ioTDBLocal); + ioTDBLocal.close(); + } else { + scalarFunction.beforeDestroy(); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 54ac31ce7f7..1aa14e053ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -562,7 +562,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } - private void clearUp( + private static void clearUp( IClientSession clientSession, Long statementId, Long queryId, @@ -572,7 +572,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { clientSession.removeQueryId(statementId, queryId); } - private void clearUp( + private static void clearUp( IClientSession clientSession, Long statementId, Long queryId, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 73a09597cd5..45b8643d02c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.queryengine.plan; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; @@ -30,6 +29,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.memory.IMemoryBlock; import org.apache.iotdb.commons.memory.MemoryBlockType; @@ -155,6 +155,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewr import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.db.queryengine.udf.InternalQueryExecutor; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.thrift.TBase; @@ -307,6 +308,18 @@ public class Coordinator { boolean userQuery, boolean debug, BiFunction<MPPQueryContext, Long, IQueryExecution> iQueryExecutionFactory) { + return execution(queryId, session, sql, userQuery, debug, false, iQueryExecutionFactory); + } + + private ExecutionResult execution( + long queryId, + SessionInfo session, + String sql, + boolean userQuery, + boolean debug, + boolean readOnlyInternalQuery, + BiFunction<MPPQueryContext, Long, IQueryExecution> iQueryExecutionFactory) + throws IoTDBException { long startTime = System.currentTimeMillis(); QueryId globalQueryId = queryIdGenerator.createNextQueryId(); MPPQueryContext queryContext = null; @@ -325,6 +338,9 @@ public class Coordinator { queryContext.setUserQuery(userQuery); queryContext.setDebug(debug); IQueryExecution execution = iQueryExecutionFactory.apply(queryContext, startTime); + if (readOnlyInternalQuery) { + InternalQueryExecutor.validateReadOnlyQuery(execution); + } if (execution.isQuery()) { queryExecutionMap.put(queryId, execution); } else { @@ -488,12 +504,40 @@ public class Coordinator { long timeOut, boolean userQuery, boolean debug) { + return executeForTableModel( + statement, + sqlParser, + clientSession, + queryId, + session, + sql, + metadata, + timeOut, + userQuery, + debug, + false); + } + + public ExecutionResult executeForTableModel( + org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement statement, + SqlParser sqlParser, + IClientSession clientSession, + long queryId, + SessionInfo session, + String sql, + Metadata metadata, + long timeOut, + boolean userQuery, + boolean debug, + boolean readOnlyInternalQuery) + throws IoTDBException { return execution( queryId, session, sql, userQuery, debug, + readOnlyInternalQuery, ((queryContext, startTime) -> createQueryExecutionForTableModel( statement, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index 185c0d12c06..9a2e35a8c1e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -68,7 +68,6 @@ import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle; import org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle; import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager; -import org.apache.iotdb.db.queryengine.execution.operator.EmptyDataOperator; import org.apache.iotdb.db.queryengine.execution.operator.ExplainAnalyzeOperator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.TableIntoOperator; @@ -132,13 +131,15 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Table import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryCountNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.udf.IoTDBLocalImpl; +import org.apache.iotdb.db.queryengine.udf.ScalarUdfExpressionDetector; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils; +import org.apache.iotdb.udf.api.IoTDBLocal; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.idcolumn.FourOrHigherLevelDBExtractor; @@ -2101,4 +2102,36 @@ public class DataNodeTableOperatorGenerator protected SessionInfo getSessionInfo(LocalExecutionPlanContext context) { return context.getDriverContext().getFragmentInstanceContext().getSessionInfo(); } + + @Override + protected Optional<IoTDBLocal> getIoTDBLocal( + LocalExecutionPlanContext context, PlanNodeId planNodeId) { + return Optional.of( + IoTDBLocalImpl.create( + getSessionInfo(context), context.getFragmentInstanceId(), planNodeId)); + } + + @Override + protected Operator constructFilterAndProjectOperator( + Optional<Expression> predicate, + Operator inputOperator, + Expression[] projectExpressions, + List<TSDataType> inputDataTypes, + Map<Symbol, List<InputLocation>> inputLocations, + PlanNodeId planNodeId, + LocalExecutionPlanContext context) { + Optional<IoTDBLocal> ioTDBLocal = + ScalarUdfExpressionDetector.contains(predicate, projectExpressions) + ? getIoTDBLocal(context, planNodeId) + : Optional.empty(); + return constructFilterAndProjectOperator( + predicate, + inputOperator, + projectExpressions, + inputDataTypes, + inputLocations, + planNodeId, + context, + ioTDBLocal); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java new file mode 100644 index 00000000000..5a31c5910f7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.udf; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.QueryTimeoutException; +import org.apache.iotdb.commons.exception.SemanticException; +import org.apache.iotdb.commons.queryengine.common.SessionInfo; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; +import org.apache.iotdb.rpc.TSStatusCode; + +/** Stateless utility for UDF embedded table-model queries. */ +public final class InternalQueryExecutor { + + private static final Coordinator COORDINATOR = Coordinator.getInstance(); + private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); + private static final Metadata METADATA = LocalExecutionPlanner.getInstance().metadata; + private static final SqlParser RELATION_SQL_PARSER = new SqlParser(); + + private InternalQueryExecutor() {} + + public static long computeRemainingTimeoutMs( + long outerQueryStartTimeMs, long outerQueryTimeoutMs) { + return outerQueryTimeoutMs - (System.currentTimeMillis() - outerQueryStartTimeMs); + } + + public static InternalQueryResult executeInternalQuery( + IClientSession internalSession, + SessionInfo sessionInfo, + String sql, + long outerQueryStartTimeMs, + long outerQueryTimeoutMs) + throws IoTDBException { + + long timeoutMs = computeRemainingTimeoutMs(outerQueryStartTimeMs, outerQueryTimeoutMs); + if (timeoutMs <= 0) { + throw new QueryTimeoutException( + "Outer query timeout exceeded before UDF internal query starts"); + } + + Statement parsedStatement = parseTableStatement(internalSession, sessionInfo, sql); + + long statementId = SESSION_MANAGER.requestStatementId(internalSession); + long queryId = SESSION_MANAGER.requestQueryId(internalSession, statementId); + + ExecutionResult result = + COORDINATOR.executeForTableModel( + parsedStatement, + RELATION_SQL_PARSER, + internalSession, + queryId, + sessionInfo, + sql, + METADATA, + timeoutMs, + false, + false, + true); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null); + internalSession.removeQueryId(statementId, queryId); + throw new IoTDBException(result.status.message, result.status.code); + } + + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + if (queryExecution == null) { + COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null); + internalSession.removeQueryId(statementId, queryId); + throw new IoTDBException( + "Internal query execution not found", TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } + + return new InternalQueryResult( + queryExecution, + () -> { + COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null); + internalSession.removeQueryId(statementId, queryId); + }); + } + + static void validateReadOnlyQuery(IQueryExecution execution) throws IoTDBException { + if (execution.getQueryType() != QueryType.READ) { + execution.stopAndCleanup(null); + throw new SemanticException("Only query is allowed when used IoTDBLocal in UDF"); + } + } + + private static Statement parseTableStatement( + IClientSession internalSession, SessionInfo sessionInfo, String sql) throws IoTDBException { + try { + Statement statement = + RELATION_SQL_PARSER.createStatement(sql, sessionInfo.getZoneId(), internalSession); + return statement; + } catch (Exception e) { + throw new IoTDBException(e.getMessage(), TSStatusCode.SQL_PARSE_ERROR.getStatusCode()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java new file mode 100644 index 00000000000..9f35fa95618 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.udf; + +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; + +/** Internal query result holding {@link IQueryExecution} and a cleanup action. */ +public final class InternalQueryResult implements AutoCloseable { + + private final IQueryExecution queryExecution; + private final Runnable releaseAction; + + public InternalQueryResult(IQueryExecution queryExecution, Runnable releaseAction) { + this.queryExecution = queryExecution; + this.releaseAction = releaseAction; + } + + public IQueryExecution getQueryExecution() { + return queryExecution; + } + + @Override + public void close() { + releaseAction.run(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java new file mode 100644 index 00000000000..32540b20e06 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.udf; + +import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.queryengine.common.SessionInfo; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.protocol.session.InternalClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; +import org.apache.iotdb.udf.api.IoTDBLocal; +import org.apache.iotdb.udf.api.UDFResultSet; +import org.apache.iotdb.udf.api.exception.UDFException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** Server-side implementation of {@link IoTDBLocal}. */ +public class IoTDBLocalImpl implements IoTDBLocal { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLocalImpl.class); + private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); + private static final Coordinator COORDINATOR = Coordinator.getInstance(); + + private final SessionInfo sessionInfo; + private final InternalClientSession internalSession; + private final long outerQueryStartTimeMs; + private final long outerQueryTimeoutMs; + private final List<UDFResultSetImpl> openResultSets = new ArrayList<>(); + private boolean closed; + + public IoTDBLocalImpl( + SessionInfo sessionInfo, + String internalClientId, + long outerQueryStartTimeMs, + long outerQueryTimeoutMs) { + this.sessionInfo = sessionInfo; + this.outerQueryStartTimeMs = outerQueryStartTimeMs; + this.outerQueryTimeoutMs = outerQueryTimeoutMs; + this.internalSession = new InternalClientSession(internalClientId); + internalSession.setSqlDialect(sessionInfo.getSqlDialect()); + sessionInfo.getDatabaseName().ifPresent(internalSession::setDatabaseName); + SESSION_MANAGER.supplySession( + internalSession, + sessionInfo.getUserId(), + sessionInfo.getUserName(), + sessionInfo.getZoneId(), + ClientVersion.V_1_0); + } + + public static String formatInternalClientId( + FragmentInstanceId fragmentInstanceId, PlanNodeId planNodeId) { + return "udf-local-" + fragmentInstanceId + "-" + planNodeId; + } + + public static IoTDBLocalImpl create( + SessionInfo sessionInfo, FragmentInstanceId fragmentInstanceId, PlanNodeId planNodeId) { + long outerStart = System.currentTimeMillis(); + long outerTimeout = + org.apache.iotdb.db.conf.IoTDBDescriptor.getInstance() + .getConfig() + .getQueryTimeoutThreshold(); + String globalQueryId = fragmentInstanceId.getQueryId().getId(); + for (IQueryExecution execution : COORDINATOR.getAllQueryExecutions()) { + if (globalQueryId.equals(execution.getQueryId())) { + outerStart = execution.getStartExecutionTime(); + outerTimeout = execution.getTimeout(); + break; + } + } + return new IoTDBLocalImpl( + sessionInfo, + formatInternalClientId(fragmentInstanceId, planNodeId), + outerStart, + outerTimeout); + } + + @Override + public UDFResultSet query(String sql) throws UDFException { + if (closed) { + throw new UDFException("IoTDBLocal is already closed"); + } + try { + InternalQueryResult result = + InternalQueryExecutor.executeInternalQuery( + internalSession, sessionInfo, sql, outerQueryStartTimeMs, outerQueryTimeoutMs); + int index = openResultSets.size(); + UDFResultSetImpl rs = new UDFResultSetImpl(result, this, index); + openResultSets.add(rs); + return rs; + } catch (IoTDBException e) { + throw new UDFException(e.getMessage(), e); + } + } + + void markResultSetClosed(int index) { + if (index >= 0 && index < openResultSets.size()) { + openResultSets.set(index, null); + } + } + + public void closeAllResultSets() { + for (UDFResultSetImpl rs : openResultSets) { + if (rs != null) { + rs.close(); + } + } + openResultSets.clear(); + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + closeAllResultSets(); + SESSION_MANAGER.closeSession(internalSession, COORDINATOR::cleanupQueryExecution); + } + + @Override + public void info(String msg) { + LOGGER.info(msg); + } + + @Override + public void info(String format, Object... args) { + LOGGER.info(format, args); + } + + @Override + public void info(String msg, Throwable t) { + LOGGER.info(msg, t); + } + + @Override + public void warn(String msg) { + LOGGER.warn(msg); + } + + @Override + public void warn(String format, Object... args) { + LOGGER.warn(format, args); + } + + @Override + public void warn(String msg, Throwable t) { + LOGGER.warn(msg, t); + } + + @Override + public void error(String msg) { + LOGGER.error(msg); + } + + @Override + public void error(String format, Object... args) { + LOGGER.error(format, args); + } + + @Override + public void error(String msg, Throwable t) { + LOGGER.error(msg, t); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java new file mode 100644 index 00000000000..cf87bee846a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.udf; + +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.FunctionCall; +import org.apache.iotdb.commons.queryengine.plan.udf.TableUDFUtils; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultTraversalVisitor; + +import java.util.Optional; + +/** Detects whether filter or project expressions contain a user-defined scalar function. */ +public final class ScalarUdfExpressionDetector extends DefaultTraversalVisitor<Void> { + + private boolean found; + + private ScalarUdfExpressionDetector() {} + + public static boolean contains(Optional<Expression> predicate, Expression[] projectExpressions) { + ScalarUdfExpressionDetector detector = new ScalarUdfExpressionDetector(); + if (predicate.isPresent()) { + detector.process(predicate.get(), null); + } + for (Expression expression : projectExpressions) { + if (detector.found) { + return true; + } + detector.process(expression, null); + } + return detector.found; + } + + @Override + public Void visitFunctionCall(FunctionCall node, Void context) { + if (TableUDFUtils.isScalarFunction(node.getName().getSuffix())) { + found = true; + return null; + } + return super.visitFunctionCall(node, context); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java new file mode 100644 index 00000000000..647e0115a64 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.udf; + +import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.RecordIterator; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; +import org.apache.iotdb.udf.api.UDFResultSet; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.access.Record; + +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.type.Type; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Server-side implementation of {@link UDFResultSet}. */ +public class UDFResultSetImpl implements UDFResultSet { + + private final InternalQueryResult queryResult; + private final IoTDBLocalImpl owner; + private final int index; + + private Iterator<Record> rowIterator; + private final List<Type> columnTypes; + private boolean closed; + + public UDFResultSetImpl(InternalQueryResult queryResult, IoTDBLocalImpl owner, int index) { + this.queryResult = queryResult; + this.owner = owner; + this.index = index; + this.columnTypes = buildColumnTypes(queryResult.getQueryExecution()); + } + + @Override + public boolean hasNext() throws UDFException { + ensureOpen(); + + while (rowIterator == null || !rowIterator.hasNext()) { + if (!queryResult.getQueryExecution().hasNextResult()) { + return false; + } + Optional<TsBlock> batch; + try { + batch = queryResult.getQueryExecution().getBatchResult(); + } catch (IoTDBException e) { + throw new UDFException(e.getMessage()); + } + if (!batch.isPresent()) { + return false; + } + TsBlock currentBlock = batch.get(); + rowIterator = + new RecordIterator( + Arrays.asList(currentBlock.getValueColumns()), + columnTypes, + currentBlock.getPositionCount()); + } + return true; + } + + @Override + public Record next() throws UDFException { + ensureOpen(); + + if (rowIterator == null || !rowIterator.hasNext()) { + throw new NoSuchElementException(); + } + return rowIterator.next(); + } + + @Override + public void close() throws UDFException { + if (closed) { + return; + } + closed = true; + try { + queryResult.close(); + } catch (RuntimeException e) { + throw new UDFException("Failed to close internal query result", e); + } + owner.markResultSetClosed(index); + } + + private void ensureOpen() throws UDFException { + if (closed) { + throw new UDFException("UDFResultSet is already closed"); + } + } + + private static List<Type> buildColumnTypes(IQueryExecution queryExecution) { + return queryExecution.getDatasetHeader().getColumnHeaders().stream() + .map(ColumnHeader::getColumnType) + .map(UDFDataTypeTransformer::transformToUDFDataType) + .map(UDFDataTypeTransformer::transformUDFDataTypeToReadType) + .collect(Collectors.toList()); + } +}
