This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch fold_timefilter_scalar_subquery in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f5d0cc64b35b6690de1bb851e907882b5c18d812 Author: lancelly <[email protected]> AuthorDate: Fri Jun 6 15:34:44 2025 +0800 execute uncorrelated scalar subquery in predicate in advance to utilize predicate pushdown --- .../ConvertPredicateToTimeFilterVisitor.java | 10 +- .../plan/relational/planner/SubqueryPlanner.java | 10 ++ ...ithUncorrelatedScalarSubqueryReconstructor.java | 171 +++++++++++++++++++++ .../relational/sql/ast/ComparisonExpression.java | 12 +- .../plan/relational/sql/ast/LongLiteral.java | 2 +- .../relational/planner/CorrelatedSubqueryTest.java | 3 - 6 files changed, 201 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java index b9ab2109a15..ff0366848f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BetweenPredicate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DoubleLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.IfExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InListExpression; @@ -225,6 +226,13 @@ public class ConvertPredicateToTimeFilterVisitor extends PredicateVisitor<Filter } public static long getLongValue(Expression expression) { - return ((LongLiteral) expression).getParsedValue(); + if (expression instanceof LongLiteral) { + return ((LongLiteral) expression).getParsedValue(); + } else if (expression instanceof DoubleLiteral) { + return (long) ((DoubleLiteral) expression).getValue(); + } else { + throw new IllegalArgumentException( + "Expression should be LongLiteral or DoubleLiteral, but got: " + expression.getClass()); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java index ebc32b6a247..f9e62930f02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope; import org.apache.iotdb.db.queryengine.plan.relational.planner.QueryPlanner.PlanAndMappings; +import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.PredicateWithUncorrelatedScalarSubqueryReconstructor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -125,6 +126,9 @@ class SubqueryPlanner { List<SubqueryExpression> scalarSubqueries = subqueries.getSubqueries(); if (!scalarSubqueries.isEmpty()) { + // try to execute un-correlated scalar subqueries in the predicate in advance to utilize + // predicate pushdown if possible + tryFoldUncorrelatedScalarSubqueryInPredicate(expression, plannerContext); for (Cluster<SubqueryExpression> cluster : cluster(builder.getScope(), selectSubqueries(builder, expression, scalarSubqueries))) { builder = planScalarSubquery(builder, cluster); @@ -151,6 +155,12 @@ class SubqueryPlanner { return builder; } + private void tryFoldUncorrelatedScalarSubqueryInPredicate( + Expression expression, MPPQueryContext context) { + PredicateWithUncorrelatedScalarSubqueryReconstructor + .reconstructPredicateWithUncorrelatedScalarSubquery(expression, context); + } + /** * Find subqueries from the candidate set that are children of the given parent and that have not * already been handled in the subplan diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/PredicateWithUncorrelatedScalarSubqueryReconstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/PredicateWithUncorrelatedScalarSubqueryReconstructor.java new file mode 100644 index 00000000000..dc933cd6545 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/PredicateWithUncorrelatedScalarSubqueryReconstructor.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.ir; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BinaryLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DoubleLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NotExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SubqueryExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.read.common.block.TsBlock; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; + +public class PredicateWithUncorrelatedScalarSubqueryReconstructor { + + private static final SqlParser relationSqlParser = new SqlParser(); + + private static final Coordinator coordinator = Coordinator.getInstance(); + + private PredicateWithUncorrelatedScalarSubqueryReconstructor() { + // utility class + } + + public static void reconstructPredicateWithUncorrelatedScalarSubquery( + Expression expression, MPPQueryContext context) { + if (expression instanceof LogicalExpression) { + LogicalExpression logicalExpression = (LogicalExpression) expression; + for (Expression term : logicalExpression.getTerms()) { + reconstructPredicateWithUncorrelatedScalarSubquery(term, context); + } + } else if (expression instanceof NotExpression) { + NotExpression notExpression = (NotExpression) expression; + reconstructPredicateWithUncorrelatedScalarSubquery(notExpression.getValue(), context); + } else if (expression instanceof ComparisonExpression) { + ComparisonExpression comparisonExpression = (ComparisonExpression) expression; + Expression left = comparisonExpression.getLeft(); + Expression right = comparisonExpression.getRight(); + if (left instanceof Identifier && right instanceof SubqueryExpression) { + Optional<Literal> result = + fetchUncorrelatedSubqueryResultForPredicate((SubqueryExpression) right, context); + // If the subquery result is not present, we cannot reconstruct the predicate. + if (result.isPresent()) { + right = result.get(); + } + } else if (right instanceof Identifier && left instanceof SubqueryExpression) { + Optional<Literal> result = + fetchUncorrelatedSubqueryResultForPredicate((SubqueryExpression) left, context); + if (result.isPresent()) { + left = result.get(); + } + } + comparisonExpression.setLeft(left); + comparisonExpression.setRight(right); + } + } + + /** + * @return an Optional containing the result of the uncorrelated scalar subquery. Returns + * Optional.empty() if the subquery cannot be executed in advance or if it does not return a + * valid result. + */ + private static Optional<Literal> fetchUncorrelatedSubqueryResultForPredicate( + SubqueryExpression subqueryExpression, MPPQueryContext context) { + final long queryId = SessionManager.getInstance().requestQueryId(); + Throwable t = null; + + try { + final ExecutionResult executionResult = + coordinator.executeForTableModel( + subqueryExpression.getQuery(), + relationSqlParser, + SessionManager.getInstance().getCurrSession(), + queryId, + SessionManager.getInstance() + .getSessionInfoOfTableModel(SessionManager.getInstance().getCurrSession()), + "Try to Fetch Uncorrelated Scalar Subquery Result for Predicate", + LocalExecutionPlanner.getInstance().metadata, + context.getTimeOut(), + false); + + // This may occur when the subquery cannot be executed in advance (for example, with + // correlated scalar subqueries). + // Since we cannot determine the subquery's validity beforehand, we must submit the subquery. + // This approach may slow down filter involving correlated scalar subqueries. + if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return Optional.empty(); + } + + while (coordinator.getQueryExecution(queryId).hasNextResult()) { + final Optional<TsBlock> tsBlock; + try { + tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); + } catch (final IoTDBException e) { + t = e; + throw new RuntimeException("Failed to Fetch Subquery Result.", e); + } + if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { + continue; + } + final Column[] columns = tsBlock.get().getValueColumns(); + checkArgument(columns.length == 1, "Scalar Subquery result should only have one column."); + checkArgument( + tsBlock.get().getPositionCount() == 1 && !tsBlock.get().getColumn(0).isNull(0), + "Scalar Subquery result should only have one row."); + switch (columns[0].getDataType()) { + case INT32: + case DATE: + return Optional.of(new LongLiteral(Long.toString(columns[0].getInt(0)))); + case INT64: + case TIMESTAMP: + return Optional.of(new LongLiteral(Long.toString(columns[0].getLong(0)))); + case FLOAT: + return Optional.of(new DoubleLiteral(Double.toString(columns[0].getFloat(0)))); + case DOUBLE: + return Optional.of(new DoubleLiteral(Double.toString(columns[0].getDouble(0)))); + case BOOLEAN: + return Optional.of(new BooleanLiteral(Boolean.toString(columns[0].getBoolean(0)))); + case BLOB: + case TEXT: + case STRING: + return Optional.of(new BinaryLiteral(columns[0].getBinary(0).toString())); + default: + throw new IllegalArgumentException( + String.format( + "Unsupported data type for scalar subquery result: %s", + columns[0].getDataType())); + } + } + } catch (final Throwable throwable) { + t = throwable; + } finally { + coordinator.cleanupQueryExecution(queryId, null, t); + } + return Optional.empty(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ComparisonExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ComparisonExpression.java index a70badc932a..5786fa26ccb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ComparisonExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ComparisonExpression.java @@ -96,8 +96,8 @@ public class ComparisonExpression extends Expression { } private final Operator operator; - private final Expression left; - private final Expression right; + private Expression left; + private Expression right; public ComparisonExpression(Operator operator, Expression left, Expression right) { super(null); @@ -134,6 +134,14 @@ public class ComparisonExpression extends Expression { return right; } + public void setLeft(Expression left) { + this.left = left; + } + + public void setRight(Expression right) { + this.right = right; + } + @Override public <R, C> R accept(AstVisitor<R, C> visitor, C context) { return visitor.visitComparisonExpression(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LongLiteral.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LongLiteral.java index f9b8a14c035..65d4bf7123f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LongLiteral.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LongLiteral.java @@ -99,7 +99,7 @@ public class LongLiteral extends Literal { return parsedValue == ((LongLiteral) other).parsedValue; } - private static long parse(String value) { + public static long parse(String value) { value = value.replace("_", ""); if (value.startsWith("0x") || value.startsWith("0X")) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/CorrelatedSubqueryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/CorrelatedSubqueryTest.java index e7baf8078f1..ccc6db6620b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/CorrelatedSubqueryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/CorrelatedSubqueryTest.java @@ -63,9 +63,6 @@ public class CorrelatedSubqueryTest { PlanMatchPattern tableScan2 = tableScan("testdb.table2", ImmutableMap.of("s2_7", "s2")); - Expression filterPredicate = - new CoalesceExpression(new BooleanLiteral("true"), new BooleanLiteral("false")); - // Verify full LogicalPlan /* * └──OutputNode
