This is an automated email from the ASF dual-hosted git repository.

Wei-hao-Li pushed a commit to branch IoTDBLocal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fd0846f313925c050e7d2cf78fd9ca1d4b7ece08
Author: Weihao Li <[email protected]>
AuthorDate: Wed Jun 24 02:19:53 2026 +0800

    draft
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../java/org/apache/iotdb/udf/api/IoTDBLocal.java  |  73 ++++++++
 .../org/apache/iotdb/udf/api/UDFResultSet.java     |  45 +++++
 .../udf/api/relational/AggregateFunction.java      |   9 +
 .../iotdb/udf/api/relational/ScalarFunction.java   |  19 +++
 .../relational/ColumnTransformerBuilder.java       |  45 ++++-
 .../calc/plan/planner/TableOperatorGenerator.java  |  31 +++-
 .../udf/UserDefineScalarFunctionTransformer.java   |  23 ++-
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |   4 +-
 .../iotdb/db/queryengine/plan/Coordinator.java     |  46 ++++-
 .../planner/DataNodeTableOperatorGenerator.java    |  37 +++-
 .../db/queryengine/udf/InternalQueryExecutor.java  | 126 ++++++++++++++
 .../db/queryengine/udf/InternalQueryResult.java    |  43 +++++
 .../iotdb/db/queryengine/udf/IoTDBLocalImpl.java   | 188 +++++++++++++++++++++
 .../udf/ScalarUdfExpressionDetector.java           |  58 +++++++
 .../iotdb/db/queryengine/udf/UDFResultSetImpl.java | 123 ++++++++++++++
 15 files changed, 857 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java
new file mode 100644
index 00000000000..0c96f919fc1
--- /dev/null
+++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+
+/**
+ * Entry point for UDF-embedded SQL queries and logging, injected by the 
framework during UDF
+ * lifecycle. Reuses the outer query caller's user identity and permissions 
without extra client
+ * dependencies.
+ */
+public interface IoTDBLocal {
+
+  /**
+   * Execute a single read-only table-model SQL statement and return a 
streaming result set.
+   *
+   * @param sql table-model read SQL (e.g. SELECT, SHOW CONFIGNODES)
+   * @return query result set; call {@link UDFResultSet#close()} when done or 
rely on framework
+   *     cleanup
+   * @throws UDFException on parse failure, permission denied, or execution 
error
+   */
+  UDFResultSet query(String sql) throws UDFException;
+
+  /** Log at INFO level. */
+  void info(String msg);
+
+  /** Log at INFO level with formatting. */
+  void info(String format, Object... args);
+
+  /** Log at INFO level with exception stack. */
+  void info(String msg, Throwable t);
+
+  /** Log at WARN level. */
+  void warn(String msg);
+
+  /** Log at WARN level with formatting. */
+  void warn(String format, Object... args);
+
+  /** Log at WARN level with exception stack. */
+  void warn(String msg, Throwable t);
+
+  /** Log at ERROR level. */
+  void error(String msg);
+
+  /** Log at ERROR level with formatting. */
+  void error(String format, Object... args);
+
+  /** Log at ERROR level with exception stack. */
+  void error(String msg, Throwable t);
+
+  /**
+   * Release internal session and other resources. Called by the framework 
after beforeDestroy
+   * method.
+   */
+  void close();
+}
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/UDFResultSet.java 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/UDFResultSet.java
new file mode 100644
index 00000000000..ed6d13442bf
--- /dev/null
+++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/UDFResultSet.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.access.Record;
+
+/** Streaming result set returned by {@link IoTDBLocal#query(String)}. */
+public interface UDFResultSet extends AutoCloseable {
+
+  /**
+   * @return {@code true} if another row is available
+   * @throws UDFException if underlying read fails
+   */
+  boolean hasNext() throws UDFException;
+
+  /**
+   * @return the next row
+   * @throws UDFException if underlying read fails
+   * @throws java.util.NoSuchElementException if no more rows (consistent with 
{@link
+   *     java.util.Iterator})
+   */
+  Record next() throws UDFException;
+
+  /** Release query resources. Repeated calls must be idempotent. */
+  @Override
+  void close() throws UDFException;
+}
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java
index 6c9d385ac24..67fdb0aa2b1 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregateFunction.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.udf.api.relational;
 
+import org.apache.iotdb.udf.api.IoTDBLocal;
 import org.apache.iotdb.udf.api.State;
 import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis;
 import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments;
@@ -58,6 +59,10 @@ public interface AggregateFunction extends SQLFunction {
     // do nothing
   }
 
+  default void beforeStart(FunctionArguments arguments, IoTDBLocal local) 
throws UDFException {
+    beforeStart(arguments);
+  }
+
   /** Create and initialize state. You may bind some resource in this method. 
*/
   State createState();
 
@@ -101,4 +106,8 @@ public interface AggregateFunction extends SQLFunction {
   default void beforeDestroy() {
     // do nothing
   }
+
+  default void beforeDestroy(IoTDBLocal local) {
+    beforeDestroy();
+  }
 }
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java
index 68d7793093d..e1daab6e4b0 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.udf.api.relational;
 
+import org.apache.iotdb.udf.api.IoTDBLocal;
 import org.apache.iotdb.udf.api.customizer.analysis.ScalarFunctionAnalysis;
 import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments;
 import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
@@ -54,6 +55,14 @@ public interface ScalarFunction extends SQLFunction {
     // do nothing
   }
 
+  /**
+   * Same as {@link #beforeStart(FunctionArguments)} with access to {@link 
IoTDBLocal} for embedded
+   * queries.
+   */
+  default void beforeStart(FunctionArguments arguments, IoTDBLocal local) 
throws UDFException {
+    beforeStart(arguments);
+  }
+
   /**
    * This method will be called to process the transformation. In a single UDF 
query, this method
    * may be called multiple times.
@@ -63,8 +72,18 @@ public interface ScalarFunction extends SQLFunction {
    */
   Object evaluate(Record input) throws UDFException;
 
+  /** Same as {@link #evaluate(Record)} with access to {@link IoTDBLocal} for 
embedded queries. */
+  default Object evaluate(Record input, IoTDBLocal local) throws UDFException {
+    return evaluate(input);
+  }
+
   /** This method is mainly used to release the resources used in the 
ScalarFunction. */
   default void beforeDestroy() {
     // do nothing
   }
+
+  /** Same as {@link #beforeDestroy()} with access to {@link IoTDBLocal}. */
+  default void beforeDestroy(IoTDBLocal local) {
+    beforeDestroy();
+  }
 }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
index a25188361af..782222a1394 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
@@ -198,6 +198,7 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.type.TypeNotFoundExc
 import org.apache.iotdb.commons.queryengine.plan.udf.TableUDFUtils;
 import 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction;
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.udf.api.IoTDBLocal;
 import org.apache.iotdb.udf.api.customizer.analysis.ScalarFunctionAnalysis;
 import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments;
 import org.apache.iotdb.udf.api.relational.ScalarFunction;
@@ -1489,11 +1490,16 @@ public class ColumnTransformerBuilder
                     .collect(Collectors.toList()),
                 Collections.emptyMap());
         ScalarFunctionAnalysis analysis = scalarFunction.analyze(parameters);
-        scalarFunction.beforeStart(parameters);
+        Optional<IoTDBLocal> ioTDBLocal = context.getIoTDBLocal();
+        if (ioTDBLocal.isPresent()) {
+          scalarFunction.beforeStart(parameters, ioTDBLocal.get());
+        } else {
+          scalarFunction.beforeStart(parameters);
+        }
         Type returnType =
             
UDFDataTypeTransformer.transformUDFDataTypeToReadType(analysis.getOutputDataType());
         return new UserDefineScalarFunctionTransformer(
-            returnType, scalarFunction, childrenColumnTransformer);
+            returnType, scalarFunction, childrenColumnTransformer, 
ioTDBLocal.orElse(null));
       }
     }
     throw new IllegalArgumentException(
@@ -1954,6 +1960,8 @@ public class ColumnTransformerBuilder
     @SuppressWarnings("unused")
     private final Optional<MemoryReservationManager> memoryReservationManager;
 
+    private final Optional<IoTDBLocal> ioTDBLocal;
+
     public Context(
         SessionInfo sessionInfo,
         List<LeafColumnTransformer> leafList,
@@ -1966,6 +1974,34 @@ public class ColumnTransformerBuilder
         ITableTypeProvider typeProvider,
         ITypeMetadata metadata,
         @Nullable MemoryReservationManager memoryReservationManager) {
+      this(
+          sessionInfo,
+          leafList,
+          inputLocations,
+          cache,
+          hasSeen,
+          commonTransformerList,
+          inputDataTypes,
+          originSize,
+          typeProvider,
+          metadata,
+          memoryReservationManager,
+          Optional.empty());
+    }
+
+    public Context(
+        SessionInfo sessionInfo,
+        List<LeafColumnTransformer> leafList,
+        Map<Symbol, List<InputLocation>> inputLocations,
+        Map<Expression, ColumnTransformer> cache,
+        Map<Expression, ColumnTransformer> hasSeen,
+        List<ColumnTransformer> commonTransformerList,
+        List<TSDataType> inputDataTypes,
+        int originSize,
+        ITableTypeProvider typeProvider,
+        ITypeMetadata metadata,
+        @Nullable MemoryReservationManager memoryReservationManager,
+        Optional<IoTDBLocal> ioTDBLocal) {
       this.sessionInfo = sessionInfo;
       this.leafList = leafList;
       this.inputLocations = inputLocations;
@@ -1977,6 +2013,11 @@ public class ColumnTransformerBuilder
       this.typeProvider = typeProvider;
       this.metadata = metadata;
       this.memoryReservationManager = 
Optional.ofNullable(memoryReservationManager);
+      this.ioTDBLocal = ioTDBLocal;
+    }
+
+    public Optional<IoTDBLocal> getIoTDBLocal() {
+      return ioTDBLocal;
     }
 
     public Type getType(SymbolReference symbolReference) {
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
index bfac196461e..8b7f6cb714b 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
@@ -169,6 +169,7 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Literal;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolReference;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager;
+import org.apache.iotdb.udf.api.IoTDBLocal;
 import org.apache.iotdb.udf.api.relational.TableFunction;
 import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
 
@@ -313,6 +314,30 @@ public abstract class TableOperatorGenerator<
       Map<Symbol, List<InputLocation>> inputLocations,
       PlanNodeId planNodeId,
       C context) {
+    return constructFilterAndProjectOperator(
+        predicate,
+        inputOperator,
+        projectExpressions,
+        inputDataTypes,
+        inputLocations,
+        planNodeId,
+        context,
+        getIoTDBLocal(context, planNodeId));
+  }
+
+  protected Optional<IoTDBLocal> getIoTDBLocal(C context, PlanNodeId 
planNodeId) {
+    return Optional.empty();
+  }
+
+  protected Operator constructFilterAndProjectOperator(
+      Optional<Expression> predicate,
+      Operator inputOperator,
+      Expression[] projectExpressions,
+      List<TSDataType> inputDataTypes,
+      Map<Symbol, List<InputLocation>> inputLocations,
+      PlanNodeId planNodeId,
+      C context,
+      Optional<IoTDBLocal> ioTDBLocal) {
 
     final List<TSDataType> filterOutputDataTypes = new 
ArrayList<>(inputDataTypes);
 
@@ -341,7 +366,8 @@ public abstract class TableOperatorGenerator<
                           0,
                           context.getTableTypeProvider(),
                           metadata,
-                          context.getMemoryReservationManager());
+                          context.getMemoryReservationManager(),
+                          ioTDBLocal);
 
                   return visitor.process(p, filterColumnTransformerContext);
                 })
@@ -369,7 +395,8 @@ public abstract class TableOperatorGenerator<
             inputLocations.size(),
             context.getTableTypeProvider(),
             metadata,
-            context.getMemoryReservationManager());
+            context.getMemoryReservationManager(),
+            ioTDBLocal);
 
     for (Expression expression : projectExpressions) {
       projectOutputTransformerList.add(
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
index 4e22315b628..dd7c175774d 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.calc.transformation.dag.column.udf;
 import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.RecordIterator;
 import org.apache.iotdb.calc.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.calc.transformation.dag.column.multi.MultiColumnTransformer;
+import org.apache.iotdb.udf.api.IoTDBLocal;
 import org.apache.iotdb.udf.api.relational.ScalarFunction;
 import org.apache.iotdb.udf.api.relational.access.Record;
 
@@ -36,13 +37,16 @@ public class UserDefineScalarFunctionTransformer extends 
MultiColumnTransformer
 
   private final ScalarFunction scalarFunction;
   private final List<Type> inputTypes;
+  private final IoTDBLocal ioTDBLocal;
 
   public UserDefineScalarFunctionTransformer(
       Type returnType,
       ScalarFunction scalarFunction,
-      List<ColumnTransformer> childrenTransformers) {
+      List<ColumnTransformer> childrenTransformers,
+      IoTDBLocal ioTDBLocal) {
     super(returnType, childrenTransformers);
     this.scalarFunction = scalarFunction;
+    this.ioTDBLocal = ioTDBLocal;
     this.inputTypes =
         
childrenTransformers.stream().map(ColumnTransformer::getType).collect(Collectors.toList());
   }
@@ -53,7 +57,10 @@ public class UserDefineScalarFunctionTransformer extends 
MultiColumnTransformer
     RecordIterator iterator = new RecordIterator(childrenColumns, inputTypes, 
positionCount);
     while (iterator.hasNext()) {
       try {
-        Object result = scalarFunction.evaluate(iterator.next());
+        Object result =
+            ioTDBLocal != null
+                ? scalarFunction.evaluate(iterator.next(), ioTDBLocal)
+                : scalarFunction.evaluate(iterator.next());
         if (result == null) {
           builder.appendNull();
         } else {
@@ -80,7 +87,10 @@ public class UserDefineScalarFunctionTransformer extends 
MultiColumnTransformer
           builder.appendNull();
           continue;
         }
-        Object result = scalarFunction.evaluate(input);
+        Object result =
+            ioTDBLocal != null
+                ? scalarFunction.evaluate(input, ioTDBLocal)
+                : scalarFunction.evaluate(input);
         if (result == null) {
           builder.appendNull();
         } else {
@@ -98,7 +108,12 @@ public class UserDefineScalarFunctionTransformer extends 
MultiColumnTransformer
   @Override
   public void close() {
     super.close();
-    scalarFunction.beforeDestroy();
+    if (ioTDBLocal != null) {
+      scalarFunction.beforeDestroy(ioTDBLocal);
+      ioTDBLocal.close();
+    } else {
+      scalarFunction.beforeDestroy();
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 54ac31ce7f7..1aa14e053ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -562,7 +562,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     }
   }
 
-  private void clearUp(
+  private static void clearUp(
       IClientSession clientSession,
       Long statementId,
       Long queryId,
@@ -572,7 +572,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     clientSession.removeQueryId(statementId, queryId);
   }
 
-  private void clearUp(
+  private static void clearUp(
       IClientSession clientSession,
       Long statementId,
       Long queryId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 73a09597cd5..45b8643d02c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.queryengine.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
@@ -30,6 +29,7 @@ import 
org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.SemanticException;
 import org.apache.iotdb.commons.memory.IMemoryBlock;
 import org.apache.iotdb.commons.memory.MemoryBlockType;
@@ -155,6 +155,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewr
 import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.db.queryengine.udf.InternalQueryExecutor;
 import org.apache.iotdb.db.utils.SetThreadName;
 
 import org.apache.thrift.TBase;
@@ -307,6 +308,18 @@ public class Coordinator {
       boolean userQuery,
       boolean debug,
       BiFunction<MPPQueryContext, Long, IQueryExecution> 
iQueryExecutionFactory) {
+    return execution(queryId, session, sql, userQuery, debug, false, 
iQueryExecutionFactory);
+  }
+
+  private ExecutionResult execution(
+      long queryId,
+      SessionInfo session,
+      String sql,
+      boolean userQuery,
+      boolean debug,
+      boolean readOnlyInternalQuery,
+      BiFunction<MPPQueryContext, Long, IQueryExecution> 
iQueryExecutionFactory)
+      throws IoTDBException {
     long startTime = System.currentTimeMillis();
     QueryId globalQueryId = queryIdGenerator.createNextQueryId();
     MPPQueryContext queryContext = null;
@@ -325,6 +338,9 @@ public class Coordinator {
       queryContext.setUserQuery(userQuery);
       queryContext.setDebug(debug);
       IQueryExecution execution = iQueryExecutionFactory.apply(queryContext, 
startTime);
+      if (readOnlyInternalQuery) {
+        InternalQueryExecutor.validateReadOnlyQuery(execution);
+      }
       if (execution.isQuery()) {
         queryExecutionMap.put(queryId, execution);
       } else {
@@ -488,12 +504,40 @@ public class Coordinator {
       long timeOut,
       boolean userQuery,
       boolean debug) {
+    return executeForTableModel(
+        statement,
+        sqlParser,
+        clientSession,
+        queryId,
+        session,
+        sql,
+        metadata,
+        timeOut,
+        userQuery,
+        debug,
+        false);
+  }
+
+  public ExecutionResult executeForTableModel(
+      org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement 
statement,
+      SqlParser sqlParser,
+      IClientSession clientSession,
+      long queryId,
+      SessionInfo session,
+      String sql,
+      Metadata metadata,
+      long timeOut,
+      boolean userQuery,
+      boolean debug,
+      boolean readOnlyInternalQuery)
+      throws IoTDBException {
     return execution(
         queryId,
         session,
         sql,
         userQuery,
         debug,
+        readOnlyInternalQuery,
         ((queryContext, startTime) ->
             createQueryExecutionForTableModel(
                 statement,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index 185c0d12c06..9a2e35a8c1e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -68,7 +68,6 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
 import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
-import org.apache.iotdb.db.queryengine.execution.operator.EmptyDataOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.ExplainAnalyzeOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.TableIntoOperator;
@@ -132,13 +131,15 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Table
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryCountNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.udf.IoTDBLocalImpl;
+import org.apache.iotdb.db.queryengine.udf.ScalarUdfExpressionDetector;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils;
+import org.apache.iotdb.udf.api.IoTDBLocal;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.idcolumn.FourOrHigherLevelDBExtractor;
@@ -2101,4 +2102,36 @@ public class DataNodeTableOperatorGenerator
   protected SessionInfo getSessionInfo(LocalExecutionPlanContext context) {
     return 
context.getDriverContext().getFragmentInstanceContext().getSessionInfo();
   }
+
+  @Override
+  protected Optional<IoTDBLocal> getIoTDBLocal(
+      LocalExecutionPlanContext context, PlanNodeId planNodeId) {
+    return Optional.of(
+        IoTDBLocalImpl.create(
+            getSessionInfo(context), context.getFragmentInstanceId(), 
planNodeId));
+  }
+
+  @Override
+  protected Operator constructFilterAndProjectOperator(
+      Optional<Expression> predicate,
+      Operator inputOperator,
+      Expression[] projectExpressions,
+      List<TSDataType> inputDataTypes,
+      Map<Symbol, List<InputLocation>> inputLocations,
+      PlanNodeId planNodeId,
+      LocalExecutionPlanContext context) {
+    Optional<IoTDBLocal> ioTDBLocal =
+        ScalarUdfExpressionDetector.contains(predicate, projectExpressions)
+            ? getIoTDBLocal(context, planNodeId)
+            : Optional.empty();
+    return constructFilterAndProjectOperator(
+        predicate,
+        inputOperator,
+        projectExpressions,
+        inputDataTypes,
+        inputLocations,
+        planNodeId,
+        context,
+        ioTDBLocal);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java
new file mode 100644
index 00000000000..5a31c5910f7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.udf;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.QueryTimeoutException;
+import org.apache.iotdb.commons.exception.SemanticException;
+import org.apache.iotdb.commons.queryengine.common.SessionInfo;
+import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+/** Stateless utility for UDF embedded table-model queries. */
+public final class InternalQueryExecutor {
+
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+  private static final Metadata METADATA = 
LocalExecutionPlanner.getInstance().metadata;
+  private static final SqlParser RELATION_SQL_PARSER = new SqlParser();
+
+  private InternalQueryExecutor() {}
+
+  public static long computeRemainingTimeoutMs(
+      long outerQueryStartTimeMs, long outerQueryTimeoutMs) {
+    return outerQueryTimeoutMs - (System.currentTimeMillis() - 
outerQueryStartTimeMs);
+  }
+
+  public static InternalQueryResult executeInternalQuery(
+      IClientSession internalSession,
+      SessionInfo sessionInfo,
+      String sql,
+      long outerQueryStartTimeMs,
+      long outerQueryTimeoutMs)
+      throws IoTDBException {
+
+    long timeoutMs = computeRemainingTimeoutMs(outerQueryStartTimeMs, 
outerQueryTimeoutMs);
+    if (timeoutMs <= 0) {
+      throw new QueryTimeoutException(
+          "Outer query timeout exceeded before UDF internal query starts");
+    }
+
+    Statement parsedStatement = parseTableStatement(internalSession, 
sessionInfo, sql);
+
+    long statementId = SESSION_MANAGER.requestStatementId(internalSession);
+    long queryId = SESSION_MANAGER.requestQueryId(internalSession, 
statementId);
+
+    ExecutionResult result =
+        COORDINATOR.executeForTableModel(
+            parsedStatement,
+            RELATION_SQL_PARSER,
+            internalSession,
+            queryId,
+            sessionInfo,
+            sql,
+            METADATA,
+            timeoutMs,
+            false,
+            false,
+            true);
+
+    if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+      COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null);
+      internalSession.removeQueryId(statementId, queryId);
+      throw new IoTDBException(result.status.message, result.status.code);
+    }
+
+    IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+    if (queryExecution == null) {
+      COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null);
+      internalSession.removeQueryId(statementId, queryId);
+      throw new IoTDBException(
+          "Internal query execution not found", 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
+
+    return new InternalQueryResult(
+        queryExecution,
+        () -> {
+          COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null);
+          internalSession.removeQueryId(statementId, queryId);
+        });
+  }
+
+  static void validateReadOnlyQuery(IQueryExecution execution) throws 
IoTDBException {
+    if (execution.getQueryType() != QueryType.READ) {
+      execution.stopAndCleanup(null);
+      throw new SemanticException("Only query is allowed when used IoTDBLocal 
in UDF");
+    }
+  }
+
+  private static Statement parseTableStatement(
+      IClientSession internalSession, SessionInfo sessionInfo, String sql) 
throws IoTDBException {
+    try {
+      Statement statement =
+          RELATION_SQL_PARSER.createStatement(sql, sessionInfo.getZoneId(), 
internalSession);
+      return statement;
+    } catch (Exception e) {
+      throw new IoTDBException(e.getMessage(), 
TSStatusCode.SQL_PARSE_ERROR.getStatusCode());
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java
new file mode 100644
index 00000000000..9f35fa95618
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.udf;
+
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+
+/** Internal query result holding {@link IQueryExecution} and a cleanup 
action. */
+public final class InternalQueryResult implements AutoCloseable {
+
+  private final IQueryExecution queryExecution;
+  private final Runnable releaseAction;
+
+  public InternalQueryResult(IQueryExecution queryExecution, Runnable 
releaseAction) {
+    this.queryExecution = queryExecution;
+    this.releaseAction = releaseAction;
+  }
+
+  public IQueryExecution getQueryExecution() {
+    return queryExecution;
+  }
+
+  @Override
+  public void close() {
+    releaseAction.run();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
new file mode 100644
index 00000000000..32540b20e06
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.udf;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.queryengine.common.SessionInfo;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.udf.api.IoTDBLocal;
+import org.apache.iotdb.udf.api.UDFResultSet;
+import org.apache.iotdb.udf.api.exception.UDFException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Server-side implementation of {@link IoTDBLocal}. */
+public class IoTDBLocalImpl implements IoTDBLocal {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBLocalImpl.class);
+  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+
+  private final SessionInfo sessionInfo;
+  private final InternalClientSession internalSession;
+  private final long outerQueryStartTimeMs;
+  private final long outerQueryTimeoutMs;
+  private final List<UDFResultSetImpl> openResultSets = new ArrayList<>();
+  private boolean closed;
+
+  public IoTDBLocalImpl(
+      SessionInfo sessionInfo,
+      String internalClientId,
+      long outerQueryStartTimeMs,
+      long outerQueryTimeoutMs) {
+    this.sessionInfo = sessionInfo;
+    this.outerQueryStartTimeMs = outerQueryStartTimeMs;
+    this.outerQueryTimeoutMs = outerQueryTimeoutMs;
+    this.internalSession = new InternalClientSession(internalClientId);
+    internalSession.setSqlDialect(sessionInfo.getSqlDialect());
+    sessionInfo.getDatabaseName().ifPresent(internalSession::setDatabaseName);
+    SESSION_MANAGER.supplySession(
+        internalSession,
+        sessionInfo.getUserId(),
+        sessionInfo.getUserName(),
+        sessionInfo.getZoneId(),
+        ClientVersion.V_1_0);
+  }
+
+  public static String formatInternalClientId(
+      FragmentInstanceId fragmentInstanceId, PlanNodeId planNodeId) {
+    return "udf-local-" + fragmentInstanceId + "-" + planNodeId;
+  }
+
+  public static IoTDBLocalImpl create(
+      SessionInfo sessionInfo, FragmentInstanceId fragmentInstanceId, 
PlanNodeId planNodeId) {
+    long outerStart = System.currentTimeMillis();
+    long outerTimeout =
+        org.apache.iotdb.db.conf.IoTDBDescriptor.getInstance()
+            .getConfig()
+            .getQueryTimeoutThreshold();
+    String globalQueryId = fragmentInstanceId.getQueryId().getId();
+    for (IQueryExecution execution : COORDINATOR.getAllQueryExecutions()) {
+      if (globalQueryId.equals(execution.getQueryId())) {
+        outerStart = execution.getStartExecutionTime();
+        outerTimeout = execution.getTimeout();
+        break;
+      }
+    }
+    return new IoTDBLocalImpl(
+        sessionInfo,
+        formatInternalClientId(fragmentInstanceId, planNodeId),
+        outerStart,
+        outerTimeout);
+  }
+
+  @Override
+  public UDFResultSet query(String sql) throws UDFException {
+    if (closed) {
+      throw new UDFException("IoTDBLocal is already closed");
+    }
+    try {
+      InternalQueryResult result =
+          InternalQueryExecutor.executeInternalQuery(
+              internalSession, sessionInfo, sql, outerQueryStartTimeMs, 
outerQueryTimeoutMs);
+      int index = openResultSets.size();
+      UDFResultSetImpl rs = new UDFResultSetImpl(result, this, index);
+      openResultSets.add(rs);
+      return rs;
+    } catch (IoTDBException e) {
+      throw new UDFException(e.getMessage(), e);
+    }
+  }
+
+  void markResultSetClosed(int index) {
+    if (index >= 0 && index < openResultSets.size()) {
+      openResultSets.set(index, null);
+    }
+  }
+
+  public void closeAllResultSets() {
+    for (UDFResultSetImpl rs : openResultSets) {
+      if (rs != null) {
+        rs.close();
+      }
+    }
+    openResultSets.clear();
+  }
+
+  @Override
+  public void close() {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    closeAllResultSets();
+    SESSION_MANAGER.closeSession(internalSession, 
COORDINATOR::cleanupQueryExecution);
+  }
+
+  @Override
+  public void info(String msg) {
+    LOGGER.info(msg);
+  }
+
+  @Override
+  public void info(String format, Object... args) {
+    LOGGER.info(format, args);
+  }
+
+  @Override
+  public void info(String msg, Throwable t) {
+    LOGGER.info(msg, t);
+  }
+
+  @Override
+  public void warn(String msg) {
+    LOGGER.warn(msg);
+  }
+
+  @Override
+  public void warn(String format, Object... args) {
+    LOGGER.warn(format, args);
+  }
+
+  @Override
+  public void warn(String msg, Throwable t) {
+    LOGGER.warn(msg, t);
+  }
+
+  @Override
+  public void error(String msg) {
+    LOGGER.error(msg);
+  }
+
+  @Override
+  public void error(String format, Object... args) {
+    LOGGER.error(format, args);
+  }
+
+  @Override
+  public void error(String msg, Throwable t) {
+    LOGGER.error(msg, t);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java
new file mode 100644
index 00000000000..cf87bee846a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.udf;
+
+import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
+import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.FunctionCall;
+import org.apache.iotdb.commons.queryengine.plan.udf.TableUDFUtils;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultTraversalVisitor;
+
+import java.util.Optional;
+
+/** Detects whether filter or project expressions contain a user-defined 
scalar function. */
+public final class ScalarUdfExpressionDetector extends 
DefaultTraversalVisitor<Void> {
+
+  private boolean found;
+
+  private ScalarUdfExpressionDetector() {}
+
+  public static boolean contains(Optional<Expression> predicate, Expression[] 
projectExpressions) {
+    ScalarUdfExpressionDetector detector = new ScalarUdfExpressionDetector();
+    if (predicate.isPresent()) {
+      detector.process(predicate.get(), null);
+    }
+    for (Expression expression : projectExpressions) {
+      if (detector.found) {
+        return true;
+      }
+      detector.process(expression, null);
+    }
+    return detector.found;
+  }
+
+  @Override
+  public Void visitFunctionCall(FunctionCall node, Void context) {
+    if (TableUDFUtils.isScalarFunction(node.getName().getSuffix())) {
+      found = true;
+      return null;
+    }
+    return super.visitFunctionCall(node, context);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java
new file mode 100644
index 00000000000..647e0115a64
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.udf;
+
+import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.RecordIterator;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.udf.api.UDFResultSet;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.access.Record;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Server-side implementation of {@link UDFResultSet}. */
+public class UDFResultSetImpl implements UDFResultSet {
+
+  private final InternalQueryResult queryResult;
+  private final IoTDBLocalImpl owner;
+  private final int index;
+
+  private Iterator<Record> rowIterator;
+  private final List<Type> columnTypes;
+  private boolean closed;
+
+  public UDFResultSetImpl(InternalQueryResult queryResult, IoTDBLocalImpl 
owner, int index) {
+    this.queryResult = queryResult;
+    this.owner = owner;
+    this.index = index;
+    this.columnTypes = buildColumnTypes(queryResult.getQueryExecution());
+  }
+
+  @Override
+  public boolean hasNext() throws UDFException {
+    ensureOpen();
+
+    while (rowIterator == null || !rowIterator.hasNext()) {
+      if (!queryResult.getQueryExecution().hasNextResult()) {
+        return false;
+      }
+      Optional<TsBlock> batch;
+      try {
+        batch = queryResult.getQueryExecution().getBatchResult();
+      } catch (IoTDBException e) {
+        throw new UDFException(e.getMessage());
+      }
+      if (!batch.isPresent()) {
+        return false;
+      }
+      TsBlock currentBlock = batch.get();
+      rowIterator =
+          new RecordIterator(
+              Arrays.asList(currentBlock.getValueColumns()),
+              columnTypes,
+              currentBlock.getPositionCount());
+    }
+    return true;
+  }
+
+  @Override
+  public Record next() throws UDFException {
+    ensureOpen();
+
+    if (rowIterator == null || !rowIterator.hasNext()) {
+      throw new NoSuchElementException();
+    }
+    return rowIterator.next();
+  }
+
+  @Override
+  public void close() throws UDFException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      queryResult.close();
+    } catch (RuntimeException e) {
+      throw new UDFException("Failed to close internal query result", e);
+    }
+    owner.markResultSetClosed(index);
+  }
+
+  private void ensureOpen() throws UDFException {
+    if (closed) {
+      throw new UDFException("UDFResultSet is already closed");
+    }
+  }
+
+  private static List<Type> buildColumnTypes(IQueryExecution queryExecution) {
+    return queryExecution.getDatasetHeader().getColumnHeaders().stream()
+        .map(ColumnHeader::getColumnType)
+        .map(UDFDataTypeTransformer::transformToUDFDataType)
+        .map(UDFDataTypeTransformer::transformUDFDataTypeToReadType)
+        .collect(Collectors.toList());
+  }
+}

Reply via email to