This is an automated email from the ASF dual-hosted git repository.

Wei-hao-Li pushed a commit to branch IoTDBLocal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5f18fc57d36fa575dd5ce4f3314a3bfe96a3e009
Author: Weihao Li <[email protected]>
AuthorDate: Fri Jun 26 00:12:24 2026 +0800

    fix timeout
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../relational/ColumnTransformerBuilder.java       | 19 ++++++++++----
 .../calc/plan/planner/TableOperatorGenerator.java  | 29 ++++++++++++++++------
 .../udf/UserDefineScalarFunctionTransformer.java   |  8 ++++--
 .../fragment/FragmentInstanceContext.java          | 11 ++++++++
 .../fragment/FragmentInstanceManager.java          |  1 +
 .../planner/DataNodeTableOperatorGenerator.java    |  5 ++++
 .../plan/planner/LocalExecutionPlanContext.java    |  4 +++
 .../SimpleFragmentParallelPlanner.java             |  1 +
 .../plan/planner/plan/FragmentInstance.java        | 15 +++++++++++
 .../distribute/TableModelQueryFragmentPlanner.java |  1 +
 .../iotdb/db/queryengine/udf/IoTDBLocalImpl.java   | 29 +++++++++++-----------
 11 files changed, 95 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
index ba0464b0777..20d35fe0ab3 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
@@ -1956,7 +1956,9 @@ public class ColumnTransformerBuilder
 
     private final String fragmentInstanceId;
 
-    private final String outerQueryId;
+    private final String outerGlobalQueryId;
+
+    private final long outerLocalQueryId;
 
     @Nullable private final IoTDBLocalFactory ioTDBLocalFactory;
 
@@ -1986,6 +1988,7 @@ public class ColumnTransformerBuilder
           memoryReservationManager,
           null,
           null,
+          -1L,
           null);
     }
 
@@ -2002,7 +2005,8 @@ public class ColumnTransformerBuilder
         ITypeMetadata metadata,
         @Nullable MemoryReservationManager memoryReservationManager,
         String fragmentInstanceId,
-        String outerQueryId,
+        String outerGlobalQueryId,
+        long outerLocalQueryId,
         @Nullable IoTDBLocalFactory ioTDBLocalFactory) {
       this.sessionInfo = sessionInfo;
       this.leafList = leafList;
@@ -2016,7 +2020,8 @@ public class ColumnTransformerBuilder
       this.metadata = metadata;
       this.memoryReservationManager = 
Optional.ofNullable(memoryReservationManager);
       this.fragmentInstanceId = fragmentInstanceId;
-      this.outerQueryId = outerQueryId;
+      this.outerGlobalQueryId = outerGlobalQueryId;
+      this.outerLocalQueryId = outerLocalQueryId;
       this.ioTDBLocalFactory = ioTDBLocalFactory;
     }
 
@@ -2028,8 +2033,12 @@ public class ColumnTransformerBuilder
       return fragmentInstanceId;
     }
 
-    public String getOuterQueryId() {
-      return outerQueryId;
+    public String getOuterGlobalQueryId() {
+      return outerGlobalQueryId;
+    }
+
+    public long getOuterLocalQueryId() {
+      return outerLocalQueryId;
     }
 
     @Nullable
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
index d423db6dba6..f12dadbb87a 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
@@ -317,7 +317,8 @@ public abstract class TableOperatorGenerator<
       C context) {
 
     String fragmentInstanceId = getFragmentInstanceId(context);
-    String outerQueryId = getQueryId(context);
+    String outerGlobalQueryId = getQueryId(context);
+    long outerLocalQueryId = getLocalQueryId(context);
     IoTDBLocalFactory ioTDBLocalFactory = getIoTDBLocalFactory(context);
 
     final List<TSDataType> filterOutputDataTypes = new 
ArrayList<>(inputDataTypes);
@@ -349,7 +350,8 @@ public abstract class TableOperatorGenerator<
                           metadata,
                           context.getMemoryReservationManager(),
                           fragmentInstanceId,
-                          outerQueryId,
+                          outerGlobalQueryId,
+                          outerLocalQueryId,
                           ioTDBLocalFactory);
 
                   return visitor.process(p, filterColumnTransformerContext);
@@ -380,7 +382,8 @@ public abstract class TableOperatorGenerator<
             metadata,
             context.getMemoryReservationManager(),
             fragmentInstanceId,
-            outerQueryId,
+            outerGlobalQueryId,
+            outerLocalQueryId,
             ioTDBLocalFactory);
 
     for (Expression expression : projectExpressions) {
@@ -413,6 +416,10 @@ public abstract class TableOperatorGenerator<
     return null;
   }
 
+  protected long getLocalQueryId(C context) {
+    return -1L;
+  }
+
   protected IoTDBLocalFactory getIoTDBLocalFactory(C context) {
     return null;
   }
@@ -2515,18 +2522,26 @@ public abstract class TableOperatorGenerator<
   protected IoTDBLocal createIoTDBLocal(C context) {
     IoTDBLocalFactory factory = getIoTDBLocalFactory(context);
     String fragmentInstanceId = getFragmentInstanceId(context);
-    String queryId = getQueryId(context);
+    String outerGlobalQueryId = getQueryId(context);
+    long outerLocalQueryId = getLocalQueryId(context);
     checkArgument(factory != null, "IoTDBLocalFactory must not be null for UDF 
execution");
     checkArgument(
         fragmentInstanceId != null, "fragmentInstanceId must not be null for 
UDF execution");
-    checkArgument(queryId != null, "queryId must not be null for UDF 
execution");
-    return factory.create(getSessionInfo(context), fragmentInstanceId, 
queryId);
+    checkArgument(outerGlobalQueryId != null, "queryId must not be null for 
UDF execution");
+    checkArgument(
+        outerLocalQueryId >= 0, "outerLocalQueryId must not be negative for 
UDF execution");
+    return factory.create(
+        getSessionInfo(context), fragmentInstanceId, outerLocalQueryId, 
outerGlobalQueryId);
   }
 
   /** Factory for creating {@link IoTDBLocal} inside UDF column transformers. 
*/
   @FunctionalInterface
   public interface IoTDBLocalFactory {
 
-    IoTDBLocal create(SessionInfo sessionInfo, String fragmentInstanceId, 
String queryId);
+    IoTDBLocal create(
+        SessionInfo sessionInfo,
+        String fragmentInstanceId,
+        long outerLocalQueryId,
+        String outerGlobalQueryId);
   }
 }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
index ab2f99031d2..07d92d967f8 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
@@ -72,11 +72,15 @@ public class UserDefineScalarFunctionTransformer extends 
MultiColumnTransformer
     IoTDBLocalFactory factory = context.getIoTDBLocalFactory();
     if (factory == null
         || context.getFragmentInstanceId() == null
-        || context.getOuterQueryId() == null) {
+        || context.getOuterGlobalQueryId() == null
+        || context.getOuterLocalQueryId() < 0) {
       return null;
     }
     return factory.create(
-        context.getSessionInfo(), context.getFragmentInstanceId(), 
context.getOuterQueryId());
+        context.getSessionInfo(),
+        context.getFragmentInstanceId(),
+        context.getOuterLocalQueryId(),
+        context.getOuterGlobalQueryId());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 6bda6e9c14d..a41660ef556 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -152,6 +152,9 @@ public class FragmentInstanceContext extends QueryContext {
   // session info
   private SessionInfo sessionInfo;
 
+  // Coordinator-local query id of the outer query, used by IoTDBLocal UDF
+  private long localQueryId = -1L;
+
   private final Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap;
   private DataNodeQueryContext dataNodeQueryContext;
 
@@ -206,6 +209,7 @@ public class FragmentInstanceContext extends QueryContext {
       IDataRegionForQuery dataRegion,
       TimePredicate globalTimePredicate,
       Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap,
+      long localQueryId,
       boolean debug,
       boolean isVerbose) {
     FragmentInstanceContext instanceContext =
@@ -216,6 +220,7 @@ public class FragmentInstanceContext extends QueryContext {
             dataRegion,
             globalTimePredicate,
             dataNodeQueryContextMap,
+            localQueryId,
             debug,
             isVerbose);
     instanceContext.initialize();
@@ -271,6 +276,7 @@ public class FragmentInstanceContext extends QueryContext {
       IDataRegionForQuery dataRegion,
       TimePredicate globalTimePredicate,
       Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap,
+      long localQueryId,
       boolean debug,
       boolean verbose) {
     super(debug, verbose);
@@ -278,6 +284,7 @@ public class FragmentInstanceContext extends QueryContext {
     this.stateMachine = stateMachine;
     this.executionEndTime.set(END_TIME_INITIAL_VALUE);
     this.sessionInfo = sessionInfo;
+    this.localQueryId = localQueryId;
     this.dataRegion = dataRegion;
     this.globalTimeFilter =
         globalTimePredicate == null
@@ -567,6 +574,10 @@ public class FragmentInstanceContext extends QueryContext {
     return sessionInfo;
   }
 
+  public long getLocalQueryId() {
+    return localQueryId;
+  }
+
   public Optional<Throwable> getFailureCause() {
     return Optional.ofNullable(
         stateMachine.getFailureCauses().stream()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index e5f3cbc7e0d..75e447de586 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -167,6 +167,7 @@ public class FragmentInstanceManager {
                               dataRegion,
                               instance.getGlobalTimePredicate(),
                               dataNodeQueryContextMap,
+                              instance.getLocalQueryId(),
                               instance.isDebug(),
                               instance.isVerbose());
                         });
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index f7d1e9dc9fd..aa748cde854 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -2115,6 +2115,11 @@ public class DataNodeTableOperatorGenerator
     return context.getFragmentInstanceId().getQueryId().getId();
   }
 
+  @Override
+  protected long getLocalQueryId(LocalExecutionPlanContext context) {
+    return context.getLocalQueryId();
+  }
+
   @Override
   protected IoTDBLocalFactory getIoTDBLocalFactory(LocalExecutionPlanContext 
context) {
     return IoTDBLocalImpl.FACTORY;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
index 65908708394..c9172bb44ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
@@ -198,6 +198,10 @@ public class LocalExecutionPlanContext implements 
ITableOperatorGeneratorContext
     return driverContext.getFragmentInstanceContext().getId();
   }
 
+  public long getLocalQueryId() {
+    return driverContext.getFragmentInstanceContext().getLocalQueryId();
+  }
+
   public List<PipelineDriverFactory> getPipelineDriverFactories() {
     return pipelineDriverFactories;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 4e34d361471..dec7c1917b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -148,6 +148,7 @@ public class SimpleFragmentParallelPlanner extends 
AbstractFragmentParallelPlann
             queryContext.isDebug(),
             fragment.isRoot(),
             queryContext.isVerbose());
+    fragmentInstance.setLocalQueryId(queryContext.getLocalQueryId());
 
     selectExecutorAndHost(
         fragment,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index 463aeb54131..4cc98ee7d06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -83,6 +83,9 @@ public class FragmentInstance implements IConsensusRequest {
   private final boolean debug;
   private final boolean verbose;
 
+  // Coordinator-local query id, used to look up IQueryExecution on DataNode
+  private long localQueryId = -1L;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
@@ -269,6 +272,9 @@ public class FragmentInstance implements IConsensusRequest {
         hasHostDataNode ? 
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
     fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer);
     fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer));
+    if (buffer.hasRemaining()) {
+      fragmentInstance.setLocalQueryId(ReadWriteIOUtils.readLong(buffer));
+    }
     return fragmentInstance;
   }
 
@@ -296,6 +302,7 @@ public class FragmentInstance implements IConsensusRequest {
       }
       ReadWriteIOUtils.write(isExplainAnalyze, outputStream);
       ReadWriteIOUtils.write(isHighestPriority, outputStream);
+      ReadWriteIOUtils.write(localQueryId, outputStream);
       return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
     } catch (IOException e) {
       LOGGER.error(
@@ -342,6 +349,14 @@ public class FragmentInstance implements IConsensusRequest 
{
     return sessionInfo;
   }
 
+  public long getLocalQueryId() {
+    return localQueryId;
+  }
+
+  public void setLocalQueryId(long localQueryId) {
+    this.localQueryId = localQueryId;
+  }
+
   public boolean isExplainAnalyze() {
     return isExplainAnalyze;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index f2942536cb5..e9695ec73f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -186,6 +186,7 @@ public class TableModelQueryFragmentPlanner extends 
AbstractFragmentParallelPlan
             queryContext.isDebug(),
             fragment.isRoot(),
             queryContext.isVerbose());
+    fragmentInstance.setLocalQueryId(queryContext.getLocalQueryId());
 
     selectExecutorAndHost(
         fragment,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
index 47ee7d07f27..940288a83ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
@@ -23,7 +23,6 @@ import 
org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFacto
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.QueryTimeoutException;
 import org.apache.iotdb.commons.queryengine.common.SessionInfo;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
@@ -47,13 +46,19 @@ public class IoTDBLocalImpl implements IoTDBLocal {
 
   private final SessionInfo sessionInfo;
   private final String fragmentInstanceId;
-  private final QueryId outerQueryId;
+  private final long outerLocalQueryId;
+  private final QueryId outerGlobalQueryId;
   private final List<UDFResultSetImpl> openResultSets = new ArrayList<>();
 
-  public IoTDBLocalImpl(SessionInfo sessionInfo, String fragmentInstanceId, 
String outerQueryId) {
+  public IoTDBLocalImpl(
+      SessionInfo sessionInfo,
+      String fragmentInstanceId,
+      long outerLocalQueryId,
+      String outerGlobalQueryId) {
     this.sessionInfo = sessionInfo;
     this.fragmentInstanceId = fragmentInstanceId;
-    this.outerQueryId = QueryId.valueOf(outerQueryId);
+    this.outerLocalQueryId = outerLocalQueryId;
+    this.outerGlobalQueryId = QueryId.valueOf(outerGlobalQueryId);
   }
 
   @Override
@@ -66,7 +71,7 @@ public class IoTDBLocalImpl implements IoTDBLocal {
       }
       InternalQueryResult result =
           InternalQueryExecutor.executeInternalQuery(
-              sessionInfo, fragmentInstanceId, outerQueryId, sql, timeoutMs);
+              sessionInfo, fragmentInstanceId, outerGlobalQueryId, sql, 
timeoutMs);
       int index = openResultSets.size();
       UDFResultSetImpl rs = new UDFResultSetImpl(openResultSets, index, 
result);
       openResultSets.add(rs);
@@ -92,16 +97,12 @@ public class IoTDBLocalImpl implements IoTDBLocal {
   }
 
   private long computeRemainingTimeoutMs() {
-    long outerStart = System.currentTimeMillis();
-    long outerTimeout = 
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
-    for (IQueryExecution execution : COORDINATOR.getAllQueryExecutions()) {
-      if (outerQueryId.getId().equals(execution.getQueryId())) {
-        outerStart = execution.getStartExecutionTime();
-        outerTimeout = execution.getTimeout();
-        break;
-      }
+    IQueryExecution execution = 
COORDINATOR.getQueryExecution(outerLocalQueryId);
+    if (execution == null) {
+      return 0;
     }
-    return outerTimeout - (System.currentTimeMillis() - outerStart);
+    return execution.getTimeout()
+        - (System.currentTimeMillis() - execution.getStartExecutionTime());
   }
 
   @Override

Reply via email to