This is an automated email from the ASF dual-hosted git repository. Wei-hao-Li pushed a commit to branch IoTDBLocal in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5f18fc57d36fa575dd5ce4f3314a3bfe96a3e009 Author: Weihao Li <[email protected]> AuthorDate: Fri Jun 26 00:12:24 2026 +0800 fix timeout Signed-off-by: Weihao Li <[email protected]> --- .../relational/ColumnTransformerBuilder.java | 19 ++++++++++---- .../calc/plan/planner/TableOperatorGenerator.java | 29 ++++++++++++++++------ .../udf/UserDefineScalarFunctionTransformer.java | 8 ++++-- .../fragment/FragmentInstanceContext.java | 11 ++++++++ .../fragment/FragmentInstanceManager.java | 1 + .../planner/DataNodeTableOperatorGenerator.java | 5 ++++ .../plan/planner/LocalExecutionPlanContext.java | 4 +++ .../SimpleFragmentParallelPlanner.java | 1 + .../plan/planner/plan/FragmentInstance.java | 15 +++++++++++ .../distribute/TableModelQueryFragmentPlanner.java | 1 + .../iotdb/db/queryengine/udf/IoTDBLocalImpl.java | 29 +++++++++++----------- 11 files changed, 95 insertions(+), 28 deletions(-) diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java index ba0464b0777..20d35fe0ab3 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java @@ -1956,7 +1956,9 @@ public class ColumnTransformerBuilder private final String fragmentInstanceId; - private final String outerQueryId; + private final String outerGlobalQueryId; + + private final long outerLocalQueryId; @Nullable private final IoTDBLocalFactory ioTDBLocalFactory; @@ -1986,6 +1988,7 @@ public class ColumnTransformerBuilder memoryReservationManager, null, null, + -1L, null); } @@ -2002,7 +2005,8 @@ public class ColumnTransformerBuilder ITypeMetadata metadata, @Nullable MemoryReservationManager memoryReservationManager, String fragmentInstanceId, - String outerQueryId, + String outerGlobalQueryId, + long outerLocalQueryId, @Nullable IoTDBLocalFactory ioTDBLocalFactory) { this.sessionInfo = sessionInfo; this.leafList = leafList; @@ -2016,7 +2020,8 @@ public class ColumnTransformerBuilder this.metadata = metadata; this.memoryReservationManager = Optional.ofNullable(memoryReservationManager); this.fragmentInstanceId = fragmentInstanceId; - this.outerQueryId = outerQueryId; + this.outerGlobalQueryId = outerGlobalQueryId; + this.outerLocalQueryId = outerLocalQueryId; this.ioTDBLocalFactory = ioTDBLocalFactory; } @@ -2028,8 +2033,12 @@ public class ColumnTransformerBuilder return fragmentInstanceId; } - public String getOuterQueryId() { - return outerQueryId; + public String getOuterGlobalQueryId() { + return outerGlobalQueryId; + } + + public long getOuterLocalQueryId() { + return outerLocalQueryId; } @Nullable diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java index d423db6dba6..f12dadbb87a 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java @@ -317,7 +317,8 @@ public abstract class TableOperatorGenerator< C context) { String fragmentInstanceId = getFragmentInstanceId(context); - String outerQueryId = getQueryId(context); + String outerGlobalQueryId = getQueryId(context); + long outerLocalQueryId = getLocalQueryId(context); IoTDBLocalFactory ioTDBLocalFactory = getIoTDBLocalFactory(context); final List<TSDataType> filterOutputDataTypes = new ArrayList<>(inputDataTypes); @@ -349,7 +350,8 @@ public abstract class TableOperatorGenerator< metadata, context.getMemoryReservationManager(), fragmentInstanceId, - outerQueryId, + outerGlobalQueryId, + outerLocalQueryId, ioTDBLocalFactory); return visitor.process(p, filterColumnTransformerContext); @@ -380,7 +382,8 @@ public abstract class TableOperatorGenerator< metadata, context.getMemoryReservationManager(), fragmentInstanceId, - outerQueryId, + outerGlobalQueryId, + outerLocalQueryId, ioTDBLocalFactory); for (Expression expression : projectExpressions) { @@ -413,6 +416,10 @@ public abstract class TableOperatorGenerator< return null; } + protected long getLocalQueryId(C context) { + return -1L; + } + protected IoTDBLocalFactory getIoTDBLocalFactory(C context) { return null; } @@ -2515,18 +2522,26 @@ public abstract class TableOperatorGenerator< protected IoTDBLocal createIoTDBLocal(C context) { IoTDBLocalFactory factory = getIoTDBLocalFactory(context); String fragmentInstanceId = getFragmentInstanceId(context); - String queryId = getQueryId(context); + String outerGlobalQueryId = getQueryId(context); + long outerLocalQueryId = getLocalQueryId(context); checkArgument(factory != null, "IoTDBLocalFactory must not be null for UDF execution"); checkArgument( fragmentInstanceId != null, "fragmentInstanceId must not be null for UDF execution"); - checkArgument(queryId != null, "queryId must not be null for UDF execution"); - return factory.create(getSessionInfo(context), fragmentInstanceId, queryId); + checkArgument(outerGlobalQueryId != null, "queryId must not be null for UDF execution"); + checkArgument( + outerLocalQueryId >= 0, "outerLocalQueryId must not be negative for UDF execution"); + return factory.create( + getSessionInfo(context), fragmentInstanceId, outerLocalQueryId, outerGlobalQueryId); } /** Factory for creating {@link IoTDBLocal} inside UDF column transformers. */ @FunctionalInterface public interface IoTDBLocalFactory { - IoTDBLocal create(SessionInfo sessionInfo, String fragmentInstanceId, String queryId); + IoTDBLocal create( + SessionInfo sessionInfo, + String fragmentInstanceId, + long outerLocalQueryId, + String outerGlobalQueryId); } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java index ab2f99031d2..07d92d967f8 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java @@ -72,11 +72,15 @@ public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer IoTDBLocalFactory factory = context.getIoTDBLocalFactory(); if (factory == null || context.getFragmentInstanceId() == null - || context.getOuterQueryId() == null) { + || context.getOuterGlobalQueryId() == null + || context.getOuterLocalQueryId() < 0) { return null; } return factory.create( - context.getSessionInfo(), context.getFragmentInstanceId(), context.getOuterQueryId()); + context.getSessionInfo(), + context.getFragmentInstanceId(), + context.getOuterLocalQueryId(), + context.getOuterGlobalQueryId()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 6bda6e9c14d..a41660ef556 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -152,6 +152,9 @@ public class FragmentInstanceContext extends QueryContext { // session info private SessionInfo sessionInfo; + // Coordinator-local query id of the outer query, used by IoTDBLocal UDF + private long localQueryId = -1L; + private final Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap; private DataNodeQueryContext dataNodeQueryContext; @@ -206,6 +209,7 @@ public class FragmentInstanceContext extends QueryContext { IDataRegionForQuery dataRegion, TimePredicate globalTimePredicate, Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap, + long localQueryId, boolean debug, boolean isVerbose) { FragmentInstanceContext instanceContext = @@ -216,6 +220,7 @@ public class FragmentInstanceContext extends QueryContext { dataRegion, globalTimePredicate, dataNodeQueryContextMap, + localQueryId, debug, isVerbose); instanceContext.initialize(); @@ -271,6 +276,7 @@ public class FragmentInstanceContext extends QueryContext { IDataRegionForQuery dataRegion, TimePredicate globalTimePredicate, Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap, + long localQueryId, boolean debug, boolean verbose) { super(debug, verbose); @@ -278,6 +284,7 @@ public class FragmentInstanceContext extends QueryContext { this.stateMachine = stateMachine; this.executionEndTime.set(END_TIME_INITIAL_VALUE); this.sessionInfo = sessionInfo; + this.localQueryId = localQueryId; this.dataRegion = dataRegion; this.globalTimeFilter = globalTimePredicate == null @@ -567,6 +574,10 @@ public class FragmentInstanceContext extends QueryContext { return sessionInfo; } + public long getLocalQueryId() { + return localQueryId; + } + public Optional<Throwable> getFailureCause() { return Optional.ofNullable( stateMachine.getFailureCauses().stream() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index e5f3cbc7e0d..75e447de586 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -167,6 +167,7 @@ public class FragmentInstanceManager { dataRegion, instance.getGlobalTimePredicate(), dataNodeQueryContextMap, + instance.getLocalQueryId(), instance.isDebug(), instance.isVerbose()); }); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index f7d1e9dc9fd..aa748cde854 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -2115,6 +2115,11 @@ public class DataNodeTableOperatorGenerator return context.getFragmentInstanceId().getQueryId().getId(); } + @Override + protected long getLocalQueryId(LocalExecutionPlanContext context) { + return context.getLocalQueryId(); + } + @Override protected IoTDBLocalFactory getIoTDBLocalFactory(LocalExecutionPlanContext context) { return IoTDBLocalImpl.FACTORY; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java index 65908708394..c9172bb44ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java @@ -198,6 +198,10 @@ public class LocalExecutionPlanContext implements ITableOperatorGeneratorContext return driverContext.getFragmentInstanceContext().getId(); } + public long getLocalQueryId() { + return driverContext.getFragmentInstanceContext().getLocalQueryId(); + } + public List<PipelineDriverFactory> getPipelineDriverFactories() { return pipelineDriverFactories; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index 4e34d361471..dec7c1917b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -148,6 +148,7 @@ public class SimpleFragmentParallelPlanner extends AbstractFragmentParallelPlann queryContext.isDebug(), fragment.isRoot(), queryContext.isVerbose()); + fragmentInstance.setLocalQueryId(queryContext.getLocalQueryId()); selectExecutorAndHost( fragment, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index 463aeb54131..4cc98ee7d06 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -83,6 +83,9 @@ public class FragmentInstance implements IConsensusRequest { private final boolean debug; private final boolean verbose; + // Coordinator-local query id, used to look up IQueryExecution on DataNode + private long localQueryId = -1L; + // We can add some more params for a specific FragmentInstance // So that we can make different FragmentInstance owns different data range. @@ -269,6 +272,9 @@ public class FragmentInstance implements IConsensusRequest { hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null; fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer); fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer)); + if (buffer.hasRemaining()) { + fragmentInstance.setLocalQueryId(ReadWriteIOUtils.readLong(buffer)); + } return fragmentInstance; } @@ -296,6 +302,7 @@ public class FragmentInstance implements IConsensusRequest { } ReadWriteIOUtils.write(isExplainAnalyze, outputStream); ReadWriteIOUtils.write(isHighestPriority, outputStream); + ReadWriteIOUtils.write(localQueryId, outputStream); return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); } catch (IOException e) { LOGGER.error( @@ -342,6 +349,14 @@ public class FragmentInstance implements IConsensusRequest { return sessionInfo; } + public long getLocalQueryId() { + return localQueryId; + } + + public void setLocalQueryId(long localQueryId) { + this.localQueryId = localQueryId; + } + public boolean isExplainAnalyze() { return isExplainAnalyze; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index f2942536cb5..e9695ec73f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -186,6 +186,7 @@ public class TableModelQueryFragmentPlanner extends AbstractFragmentParallelPlan queryContext.isDebug(), fragment.isRoot(), queryContext.isVerbose()); + fragmentInstance.setLocalQueryId(queryContext.getLocalQueryId()); selectExecutorAndHost( fragment, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java index 47ee7d07f27..940288a83ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java @@ -23,7 +23,6 @@ import org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFacto import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.QueryTimeoutException; import org.apache.iotdb.commons.queryengine.common.SessionInfo; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; @@ -47,13 +46,19 @@ public class IoTDBLocalImpl implements IoTDBLocal { private final SessionInfo sessionInfo; private final String fragmentInstanceId; - private final QueryId outerQueryId; + private final long outerLocalQueryId; + private final QueryId outerGlobalQueryId; private final List<UDFResultSetImpl> openResultSets = new ArrayList<>(); - public IoTDBLocalImpl(SessionInfo sessionInfo, String fragmentInstanceId, String outerQueryId) { + public IoTDBLocalImpl( + SessionInfo sessionInfo, + String fragmentInstanceId, + long outerLocalQueryId, + String outerGlobalQueryId) { this.sessionInfo = sessionInfo; this.fragmentInstanceId = fragmentInstanceId; - this.outerQueryId = QueryId.valueOf(outerQueryId); + this.outerLocalQueryId = outerLocalQueryId; + this.outerGlobalQueryId = QueryId.valueOf(outerGlobalQueryId); } @Override @@ -66,7 +71,7 @@ public class IoTDBLocalImpl implements IoTDBLocal { } InternalQueryResult result = InternalQueryExecutor.executeInternalQuery( - sessionInfo, fragmentInstanceId, outerQueryId, sql, timeoutMs); + sessionInfo, fragmentInstanceId, outerGlobalQueryId, sql, timeoutMs); int index = openResultSets.size(); UDFResultSetImpl rs = new UDFResultSetImpl(openResultSets, index, result); openResultSets.add(rs); @@ -92,16 +97,12 @@ public class IoTDBLocalImpl implements IoTDBLocal { } private long computeRemainingTimeoutMs() { - long outerStart = System.currentTimeMillis(); - long outerTimeout = IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(); - for (IQueryExecution execution : COORDINATOR.getAllQueryExecutions()) { - if (outerQueryId.getId().equals(execution.getQueryId())) { - outerStart = execution.getStartExecutionTime(); - outerTimeout = execution.getTimeout(); - break; - } + IQueryExecution execution = COORDINATOR.getQueryExecution(outerLocalQueryId); + if (execution == null) { + return 0; } - return outerTimeout - (System.currentTimeMillis() - outerStart); + return execution.getTimeout() + - (System.currentTimeMillis() - execution.getStartExecutionTime()); } @Override
