This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch support_uncorrelated_in_predicate in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 95d162ccde8db1a85a7efb97ff73e89af4b94392 Author: lancelly <[email protected]> AuthorDate: Sun Dec 8 15:18:58 2024 +0800 add operator and workaround with RowType --- .../relational/MergeSortSemiJoinOperator.java | 171 +++++++++++++++++++ .../relational/ColumnTransformerBuilder.java | 15 ++ .../plan/planner/TableOperatorGenerator.java | 75 ++++++++- .../plan/planner/plan/node/PlanGraphPrinter.java | 12 ++ .../plan/relational/planner/SubqueryPlanner.java | 25 ++- .../distribute/TableDistributedPlanGenerator.java | 16 ++ .../TransformFilteringSemiJoinToInnerJoin.java | 153 +++++++++++++++++ .../plan/relational/planner/node/Patterns.java | 7 +- .../plan/relational/planner/node/SemiJoinNode.java | 17 +- .../optimizations/LogicalOptimizeFactory.java | 5 + .../optimizations/PushPredicateIntoTableScan.java | 185 +++++++++++++++++++++ .../plan/relational/sql/ast/RowDataType.java | 16 +- .../apache/iotdb/commons/conf/CommonConfig.java | 2 +- 13 files changed, 663 insertions(+), 36 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java new file mode 100644 index 00000000000..34ef6d6cd3f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; + +public class MergeSortSemiJoinOperator extends AbstractMergeSortJoinOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(MergeSortSemiJoinOperator.class); + + public MergeSortSemiJoinOperator( + OperatorContext operatorContext, + Operator leftChild, + int leftJoinKeyPosition, + int[] leftOutputSymbolIdx, + Operator rightChild, + int rightJoinKeyPosition, + JoinKeyComparator joinKeyComparator, + List<TSDataType> dataTypes, + Type joinKeyType) { + super( + operatorContext, + leftChild, + leftJoinKeyPosition, + leftOutputSymbolIdx, + rightChild, + rightJoinKeyPosition, + null, + joinKeyComparator, + dataTypes, + joinKeyType); + } + + @Override + public boolean hasNext() throws Exception { + if (retainedTsBlock != null) { + return true; + } + + return !leftFinished && !rightFinished; + } + + @Override + protected boolean prepareInput() throws Exception { + gotCandidateBlocks(); + return leftBlockNotEmpty() && rightBlockNotEmpty() && gotNextRightBlock(); + } + + @Override + protected boolean processFinished() { + // all the join keys in rightTsBlock are less than leftTsBlock, just skip right + if (allRightLessThanLeft()) { + resetRightBlockList(); + return true; + } + + // all the join Keys in leftTsBlock are less than rightTsBlock, just skip left + if (allLeftLessThanRight()) { + resetLeftBlock(); + return true; + } + + // continue right < left, until right >= left + while (comparator.lessThan( + rightBlockList.get(rightBlockListIdx), + rightJoinKeyPosition, + rightIndex, + leftBlock, + leftJoinKeyPosition, + leftIndex)) { + if (rightFinishedWithIncIndex()) { + return true; + } + } + if (currentRoundNeedStop()) { + return true; + } + + // continue left < right, until left >= right + while (comparator.lessThan( + leftBlock, + leftJoinKeyPosition, + leftIndex, + rightBlockList.get(rightBlockListIdx), + rightJoinKeyPosition, + rightIndex)) { + leftIndex++; + if (leftIndex >= leftBlock.getPositionCount()) { + resetLeftBlock(); + return true; + } + } + if (currentRoundNeedStop()) { + return true; + } + + // has right values equal to current left, append to join result, inc leftIndex + if (hasMatchedRightValueToProbeLeft()) { + leftIndex++; + if (leftIndex >= leftBlock.getPositionCount()) { + resetLeftBlock(); + return true; + } + } + + return false; + } + + @Override + protected boolean hasMatchedRightValueToProbeLeft() { + if (comparator.equalsTo( + leftBlock, + leftJoinKeyPosition, + leftIndex, + rightBlockList.get(rightBlockListIdx), + rightJoinKeyPosition, + rightIndex)) { + recordsWhenDataMatches(); + appendValueToResultWhenMatches(); + return true; + } + return false; + } + + protected void appendValueToResultWhenMatches() { + appendLeftBlockData(leftOutputSymbolIdx, resultBuilder, leftBlock, leftIndex); + resultBuilder.declarePosition(); + } + + @Override + protected void recordsWhenDataMatches() { + // do nothing + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftChild) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightChild) + + RamUsageEstimator.sizeOf(leftOutputSymbolIdx) + + RamUsageEstimator.sizeOf(rightOutputSymbolIdx) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + resultBuilder.getRetainedSizeInBytes(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java index 9f9658c9b05..3c2faa93fdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java @@ -59,6 +59,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NotExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullIfExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RowDataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral; @@ -177,6 +179,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.iotdb.db.queryengine.plan.expression.unary.LikeExpression.getEscapeCharacter; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoMetadataChecker.isStringLiteral; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; @@ -350,6 +353,12 @@ public class ColumnTransformerBuilder Type type; try { type = context.metadata.getType(toTypeSignature(node.getType())); + // For now, we only support casting to scalar types and Row can only have one item. + if (type instanceof RowDataType) { + type = + context.metadata.getType( + toTypeSignature(((RowDataType) type).getFields().get(0).getType())); + } } catch (TypeNotFoundException e) { throw new SemanticException(String.format("Unknown type: %s", node.getType())); } @@ -1422,6 +1431,12 @@ public class ColumnTransformerBuilder return res; } + @Override + protected ColumnTransformer visitRow(Row node, Context context) { + checkArgument(node.getItems().size() == 1, "Row should only have one item for now."); + return process(node.getItems().get(0), context); + } + @Override protected ColumnTransformer visitTrim(Trim node, Context context) { throw new UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 1bc77836095..3c2a8f05dfd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -73,6 +73,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOpera import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortFullOuterJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortInnerJoinOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortSemiJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableAggregationTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationOperator; @@ -124,6 +125,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; @@ -162,6 +164,7 @@ import org.apache.tsfile.read.common.block.column.LongColumn; import org.apache.tsfile.read.common.type.BinaryType; import org.apache.tsfile.read.common.type.BlobType; import org.apache.tsfile.read.common.type.BooleanType; +import org.apache.tsfile.read.common.type.RowType; import org.apache.tsfile.read.common.type.Type; import org.apache.tsfile.read.common.type.TypeEnum; import org.apache.tsfile.read.filter.basic.Filter; @@ -188,6 +191,7 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.MEASUREMENT; @@ -1251,12 +1255,10 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution throw new IllegalStateException("Right child of JoinNode doesn't contain right join key."); } - Type leftJoinKeyType = - context.getTypeProvider().getTableModelType(node.getCriteria().get(0).getLeft()); + Type leftJoinKeyType = getJoinKeyType(context, node.getCriteria().get(0).getLeft()); checkArgument( - leftJoinKeyType - == context.getTypeProvider().getTableModelType(node.getCriteria().get(0).getRight()), + leftJoinKeyType == getJoinKeyType(context, node.getCriteria().get(0).getRight()), "Join key type mismatch."); if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) { @@ -1351,6 +1353,71 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution } } + @Override + public Operator visitSemiJoin(SemiJoinNode node, LocalExecutionPlanContext context) { + List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + + Operator leftChild = node.getLeftChild().accept(this, context); + Operator rightChild = node.getRightChild().accept(this, context); + + ImmutableMap<Symbol, Integer> sourceColumnNamesMap = + makeLayoutFromOutputSymbols(node.getSource().getOutputSymbols()); + List<Symbol> sourceOutputSymbols = node.getSource().getOutputSymbols(); + int[] sourceOutputSymbolIdx = new int[node.getSource().getOutputSymbols().size()]; + for (int i = 0; i < sourceOutputSymbolIdx.length; i++) { + Integer index = sourceColumnNamesMap.get(sourceOutputSymbols.get(i)); + checkNotNull(index, "Source of SemiJoinNode doesn't contain sourceOutputSymbol."); + sourceOutputSymbolIdx[i] = index; + } + + ImmutableMap<Symbol, Integer> filteringSourceColumnNamesMap = + makeLayoutFromOutputSymbols(node.getRightChild().getOutputSymbols()); + + Integer sourceJoinKeyPosition = sourceColumnNamesMap.get(node.getSourceJoinSymbol()); + checkNotNull(sourceJoinKeyPosition, "Source of SemiJoinNode doesn't contain sourceJoinSymbol."); + + Integer filteringSourceJoinKeyPosition = + filteringSourceColumnNamesMap.get(node.getFilteringSourceJoinSymbol()); + checkNotNull( + filteringSourceJoinKeyPosition, + "FilteringSource of SemiJoinNode doesn't contain filteringSourceJoinSymbol."); + + Type sourceJoinKeyType = getJoinKeyType(context, node.getSourceJoinSymbol()); + + checkArgument( + sourceJoinKeyType == getJoinKeyType(context, node.getFilteringSourceJoinSymbol()), + "Join key type mismatch."); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + MergeSortSemiJoinOperator.class.getSimpleName()); + return new MergeSortSemiJoinOperator( + operatorContext, + leftChild, + sourceJoinKeyPosition, + sourceOutputSymbolIdx, + rightChild, + filteringSourceJoinKeyPosition, + JoinKeyComparatorFactory.getComparator(sourceJoinKeyType, true), + dataTypes, + sourceJoinKeyType); + } + + private Type getJoinKeyType(LocalExecutionPlanContext context, Symbol symbol) { + Type type = context.getTypeProvider().getTableModelType(symbol); + if (type instanceof RowType) { + RowType rowType = (RowType) type; + // For now, we only RowType with single column. + checkArgument( + rowType.getFields().size() == 1, "RowType with multiple columns is not supported."); + return rowType.getFields().get(0).getType(); + } + return type; + } + @Override public Operator visitEnforceSingleRow( EnforceSingleRowNode node, LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 1939f6c6d75..fff799dbfd0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -70,6 +70,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; @@ -897,6 +898,17 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter return render(node, boxValue, context); } + @Override + public List<String> visitSemiJoin(SemiJoinNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("SemiJoin-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols())); + boxValue.add(String.format("SourceJoinSymbol: %s", node.getSourceJoinSymbol())); + boxValue.add( + String.format("FilteringSourceJoinSymbol: %s", node.getFilteringSourceJoinSymbol())); + return render(node, boxValue, context); + } + private String printRegion(TRegionReplicaSet regionReplicaSet) { return String.format( "Partition: %s", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java index 59b60abba08..7e58dfdd146 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java @@ -531,7 +531,8 @@ class SubqueryPlanner { Assignments assignments = Assignments.builder() .putIdentities(subPlan.getRoot().getOutputSymbols()) - .put(wrapped, new Row(ImmutableList.of(column.toSymbolReference()))) + .put(wrapped, column.toSymbolReference()) + // .put(wrapped, new Row(ImmutableList.of(column.toSymbolReference()))) .build(); subPlan = @@ -566,12 +567,22 @@ class SubqueryPlanner { } } - subqueryPlan = - subqueryPlan.withNewRoot( - new ProjectNode( - idAllocator.genPlanNodeId(), - relationPlan.getRoot(), - Assignments.of(column, new Cast(new Row(fields.build()), toSqlType(type))))); + List<Expression> fieldsList = fields.build(); + if (fieldsList.size() > 1) { + subqueryPlan = + subqueryPlan.withNewRoot( + new ProjectNode( + idAllocator.genPlanNodeId(), + relationPlan.getRoot(), + Assignments.of(column, new Cast(new Row(fields.build()), toSqlType(type))))); + } else { + subqueryPlan = + subqueryPlan.withNewRoot( + new ProjectNode( + idAllocator.genPlanNodeId(), + relationPlan.getRoot(), + Assignments.of(column, fieldsList.get(0)))); + } return coerceIfNecessary(subqueryPlan, column, subquery, coercion); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index b48abe29ef5..cc29ccf53df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNod import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; @@ -442,6 +443,21 @@ public class TableDistributedPlanGenerator return Collections.singletonList(node); } + @Override + public List<PlanNode> visitSemiJoin(SemiJoinNode node, PlanContext context) { + List<PlanNode> leftChildrenNodes = node.getLeftChild().accept(this, context); + List<PlanNode> rightChildrenNodes = node.getRightChild().accept(this, context); + checkArgument( + leftChildrenNodes.size() == 1, + "The size of left children node of SemiJoinNode should be 1"); + checkArgument( + rightChildrenNodes.size() == 1, + "The size of right children node of SemiJoinNode should be 1"); + node.setLeftChild(leftChildrenNodes.get(0)); + node.setRightChild(rightChildrenNodes.get(0)); + return Collections.singletonList(node); + } + @Override public List<PlanNode> visitTableScan(TableScanNode node, PlanContext context) { Map<TRegionReplicaSet, TableScanNode> tableScanNodeMap = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformFilteringSemiJoinToInnerJoin.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformFilteringSemiJoinToInnerJoin.java new file mode 100644 index 00000000000..90de0f94cbb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformFilteringSemiJoinToInnerJoin.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.List; +import java.util.Optional; +import java.util.function.Predicate; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ExpressionSymbolInliner.inlineSymbols; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.and; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractConjuncts; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.semiJoin; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +/** + * Rewrite filtering semi-join to inner join. + * + * <p>Transforms: + * + * <pre> + * - Filter (semiJoinSymbol AND predicate) + * - SemiJoin (semiJoinSymbol <- (a IN b)) + * source: plan A producing symbol a + * filtering source: plan B producing symbol b + * </pre> + * + * <p>Into: + * + * <pre> + * - Project (semiJoinSymbol <- TRUE) + * - Join INNER on (a = b), joinFilter (predicate with semiJoinSymbol replaced with TRUE) + * - source + * - Aggregation distinct(b) + * - filtering source + * </pre> + */ +public class TransformFilteringSemiJoinToInnerJoin implements Rule<FilterNode> { + private static final Capture<SemiJoinNode> SEMI_JOIN = newCapture(); + + private static final Pattern<FilterNode> PATTERN = + filter().with(source().matching(semiJoin().capturedAs(SEMI_JOIN))); + + @Override + public Pattern<FilterNode> getPattern() { + return PATTERN; + } + + @Override + public Result apply(FilterNode filterNode, Captures captures, Context context) { + if (true) { + return Result.empty(); + } + SemiJoinNode semiJoin = captures.get(SEMI_JOIN); + + Symbol semiJoinSymbol = semiJoin.getSemiJoinOutput(); + Predicate<Expression> isSemiJoinSymbol = + expression -> expression.equals(semiJoinSymbol.toSymbolReference()); + + List<Expression> conjuncts = extractConjuncts(filterNode.getPredicate()); + if (conjuncts.stream().noneMatch(isSemiJoinSymbol)) { + return Result.empty(); + } + Expression filteredPredicate = + and(conjuncts.stream().filter(isSemiJoinSymbol.negate()).collect(toImmutableList())); + + Expression simplifiedPredicate = + inlineSymbols( + symbol -> { + if (symbol.equals(semiJoinSymbol)) { + return TRUE_LITERAL; + } + return symbol.toSymbolReference(); + }, + filteredPredicate); + + Optional<Expression> joinFilter = + simplifiedPredicate.equals(TRUE_LITERAL) + ? Optional.empty() + : Optional.of(simplifiedPredicate); + + PlanNode filteringSourceDistinct = + singleAggregation( + context.getIdAllocator().genPlanNodeId(), + semiJoin.getFilteringSource(), + ImmutableMap.of(), + singleGroupingSet(ImmutableList.of(semiJoin.getFilteringSourceJoinSymbol()))); + + JoinNode innerJoin = + new JoinNode( + semiJoin.getPlanNodeId(), + INNER, + semiJoin.getSource(), + filteringSourceDistinct, + ImmutableList.of( + new JoinNode.EquiJoinClause( + semiJoin.getSourceJoinSymbol(), semiJoin.getFilteringSourceJoinSymbol())), + semiJoin.getSource().getOutputSymbols(), + ImmutableList.of(), + joinFilter, + Optional.empty()); + + ProjectNode project = + new ProjectNode( + context.getIdAllocator().genPlanNodeId(), + innerJoin, + Assignments.builder() + .putIdentities(innerJoin.getOutputSymbols()) + .put(semiJoinSymbol, TRUE_LITERAL) + .build()); + + return Result.ofPlanNode(project); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java index aea9fb529f8..1b01526e4df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java @@ -137,10 +137,9 @@ public final class Patterns { return typeOf(SampleNode.class); }*/ - // public static Pattern<SemiJoinNode> semiJoin() - // { - // return typeOf(SemiJoinNode.class); - // } + public static Pattern<SemiJoinNode> semiJoin() { + return typeOf(SemiJoinNode.class); + } public static Pattern<GapFillNode> gapFill() { return typeOf(GapFillNode.class); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SemiJoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SemiJoinNode.java index 94cdb47cddc..c85b8b93b8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SemiJoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SemiJoinNode.java @@ -54,12 +54,17 @@ public class SemiJoinNode extends TwoChildProcessNode { requireNonNull(filteringSourceJoinSymbol, "filteringSourceJoinSymbol is null"); this.semiJoinOutput = requireNonNull(semiJoinOutput, "semiJoinOutput is null"); - checkArgument( - source.getOutputSymbols().contains(sourceJoinSymbol), - "Source does not contain join symbol"); - checkArgument( - filteringSource.getOutputSymbols().contains(filteringSourceJoinSymbol), - "Filtering source does not contain filtering join symbol"); + if (source != null) { + checkArgument( + source.getOutputSymbols().contains(sourceJoinSymbol), + "Source does not contain join symbol"); + } + + if (filteringSource != null) { + checkArgument( + filteringSource.getOutputSymbols().contains(filteringSourceJoinSymbol), + "Filtering source does not contain filtering join symbol"); + } } public PlanNode getSource() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index 42535ef68ef..cac5f28ab78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -58,6 +58,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Re import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveTrivialFilters; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveUnreferencedScalarSubqueries; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.SimplifyExpressions; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformFilteringSemiJoinToInnerJoin; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformUncorrelatedSubqueryToJoin; @@ -217,6 +218,10 @@ public class LogicalOptimizeFactory { new CheckSubqueryNodesAreRewritten(), simplifyOptimizer, new PushPredicateIntoTableScan(), + new IterativeOptimizer( + plannerContext, + ruleStats, + ImmutableSet.of(new TransformFilteringSemiJoinToInnerJoin())), // redo columnPrune and inlineProjections after pushPredicateIntoTableScan columnPruningOptimizer, inlineProjectionLimitFiltersOptimizer, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index afff4e259a5..b12f224a2e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -49,6 +49,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationN import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; @@ -59,6 +60,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Pair; @@ -96,6 +98,7 @@ import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizati import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.joinEqualityExpression; import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.processInnerJoin; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.EQUAL; /** * <b>Optimization phase:</b> Logical plan planning. @@ -735,6 +738,188 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { return symbolAllocator.newSymbol(expression, analysis.getType(expression)); } + @Override + public PlanNode visitSemiJoin(SemiJoinNode node, RewriteContext context) { + Expression inheritedPredicate = + context.inheritedPredicate != null ? context.inheritedPredicate : TRUE_LITERAL; + if (!extractConjuncts(inheritedPredicate) + .contains(node.getSemiJoinOutput().toSymbolReference())) { + return visitNonFilteringSemiJoin(node, context); + } + return visitFilteringSemiJoin(node, context); + } + + private PlanNode visitNonFilteringSemiJoin(SemiJoinNode node, RewriteContext context) { + Expression inheritedPredicate = + context.inheritedPredicate != null ? context.inheritedPredicate : TRUE_LITERAL; + + List<Expression> sourceConjuncts = new ArrayList<>(); + List<Expression> postJoinConjuncts = new ArrayList<>(); + + // TODO: see if there are predicates that can be inferred from the semi join output + PlanNode rewrittenFilteringSource = + node.getFilteringSource().accept(this, new RewriteContext()); + + // Push inheritedPredicates down to the source if they don't involve the semi join output + ImmutableSet<Symbol> sourceScope = ImmutableSet.copyOf(node.getSource().getOutputSymbols()); + EqualityInference inheritedInference = new EqualityInference(metadata, inheritedPredicate); + EqualityInference.nonInferrableConjuncts(metadata, inheritedPredicate) + .forEach( + conjunct -> { + Expression rewrittenConjunct = inheritedInference.rewrite(conjunct, sourceScope); + // Since each source row is reflected exactly once in the output, ok to push + // non-deterministic predicates down + if (rewrittenConjunct != null) { + sourceConjuncts.add(rewrittenConjunct); + } else { + postJoinConjuncts.add(conjunct); + } + }); + + // Add the inherited equality predicates back in + EqualityInference.EqualityPartition equalityPartition = + inheritedInference.generateEqualitiesPartitionedBy(sourceScope); + sourceConjuncts.addAll(equalityPartition.getScopeEqualities()); + postJoinConjuncts.addAll(equalityPartition.getScopeComplementEqualities()); + postJoinConjuncts.addAll(equalityPartition.getScopeStraddlingEqualities()); + + PlanNode rewrittenSource = + node.getSource().accept(this, new RewriteContext(combineConjuncts(sourceConjuncts))); + + PlanNode output = appendSortNode(node, rewrittenSource, rewrittenFilteringSource); + + if (!postJoinConjuncts.isEmpty()) { + output = + new FilterNode(queryId.genPlanNodeId(), output, combineConjuncts(postJoinConjuncts)); + } + return output; + } + + private SemiJoinNode appendSortNode( + SemiJoinNode node, PlanNode rewrittenSource, PlanNode rewrittenFilteringSource) { + OrderingScheme sourceOrderingScheme = + new OrderingScheme( + ImmutableList.of(node.getSourceJoinSymbol()), + ImmutableMap.of(node.getSourceJoinSymbol(), ASC_NULLS_LAST)); + OrderingScheme filteringSourceOrderingScheme = + new OrderingScheme( + ImmutableList.of(node.getFilteringSourceJoinSymbol()), + ImmutableMap.of(node.getFilteringSourceJoinSymbol(), ASC_NULLS_LAST)); + SortNode sourceSortNode = + new SortNode( + queryId.genPlanNodeId(), rewrittenSource, sourceOrderingScheme, false, false); + SortNode filteringSourceSortNode = + new SortNode( + queryId.genPlanNodeId(), + rewrittenFilteringSource, + filteringSourceOrderingScheme, + false, + false); + return new SemiJoinNode( + node.getPlanNodeId(), + sourceSortNode, + filteringSourceSortNode, + node.getSourceJoinSymbol(), + node.getFilteringSourceJoinSymbol(), + node.getSemiJoinOutput()); + } + + private PlanNode visitFilteringSemiJoin(SemiJoinNode node, RewriteContext context) { + Expression inheritedPredicate = + context.inheritedPredicate != null ? context.inheritedPredicate : TRUE_LITERAL; + Expression deterministicInheritedPredicate = filterDeterministicConjuncts(inheritedPredicate); + // Expression sourceEffectivePredicate = + // filterDeterministicConjuncts(effectivePredicateExtractor.extract(session, node.getSource(), + // types, typeAnalyzer)); + // Expression filteringSourceEffectivePredicate = filterDeterministicConjuncts(metadata, + // effectivePredicateExtractor.extract(session, node.getFilteringSource(), types, + // typeAnalyzer)); + Expression joinExpression = + new ComparisonExpression( + EQUAL, + node.getSourceJoinSymbol().toSymbolReference(), + node.getFilteringSourceJoinSymbol().toSymbolReference()); + + List<Symbol> sourceSymbols = node.getSource().getOutputSymbols(); + List<Symbol> filteringSourceSymbols = node.getFilteringSource().getOutputSymbols(); + + List<Expression> sourceConjuncts = new ArrayList<>(); + List<Expression> filteringSourceConjuncts = new ArrayList<>(); + List<Expression> postJoinConjuncts = new ArrayList<>(); + + // Generate equality inferences + EqualityInference allInference = + new EqualityInference(metadata, deterministicInheritedPredicate, joinExpression); + // EqualityInference allInference = new EqualityInference(metadata, + // deterministicInheritedPredicate, sourceEffectivePredicate, + // filteringSourceEffectivePredicate, joinExpression); + // EqualityInference allInferenceWithoutSourceInferred = new EqualityInference(metadata, + // deterministicInheritedPredicate, filteringSourceEffectivePredicate, joinExpression); + // EqualityInference allInferenceWithoutFilteringSourceInferred = new + // EqualityInference(metadata, deterministicInheritedPredicate, sourceEffectivePredicate, + // joinExpression); + + // Push inheritedPredicates down to the source if they don't involve the semi join output + Set<Symbol> sourceScope = ImmutableSet.copyOf(sourceSymbols); + EqualityInference.nonInferrableConjuncts(metadata, inheritedPredicate) + .forEach( + conjunct -> { + Expression rewrittenConjunct = allInference.rewrite(conjunct, sourceScope); + // Since each source row is reflected exactly once in the output, ok to push + // non-deterministic predicates down + if (rewrittenConjunct != null) { + sourceConjuncts.add(rewrittenConjunct); + } else { + postJoinConjuncts.add(conjunct); + } + }); + + // Push inheritedPredicates down to the filtering source if possible + Set<Symbol> filterScope = ImmutableSet.copyOf(filteringSourceSymbols); + EqualityInference.nonInferrableConjuncts(metadata, deterministicInheritedPredicate) + .forEach( + conjunct -> { + Expression rewrittenConjunct = allInference.rewrite(conjunct, filterScope); + // We cannot push non-deterministic predicates to filtering side. Each filtering + // side row have to be + // logically reevaluated for each source row. + if (rewrittenConjunct != null) { + filteringSourceConjuncts.add(rewrittenConjunct); + } + }); + + /* + // move effective predicate conjuncts source <-> filter + // See if we can push the filtering source effective predicate to the source side + EqualityInference.nonInferrableConjuncts(metadata, filteringSourceEffectivePredicate) + .map(conjunct -> allInference.rewrite(conjunct, sourceScope)) + .filter(Objects::nonNull) + .forEach(sourceConjuncts::add); + + // See if we can push the source effective predicate to the filtering source side + EqualityInference.nonInferrableConjuncts(metadata, sourceEffectivePredicate) + .map(conjunct -> allInference.rewrite(conjunct, filterScope)) + .filter(Objects::nonNull) + .forEach(filteringSourceConjuncts::add); + + // Add equalities from the inference back in + sourceConjuncts.addAll(allInferenceWithoutSourceInferred.generateEqualitiesPartitionedBy(sourceScope).getScopeEqualities()); + filteringSourceConjuncts.addAll(allInferenceWithoutFilteringSourceInferred.generateEqualitiesPartitionedBy(filterScope).getScopeEqualities());*/ + + PlanNode rewrittenSource = + node.getSource().accept(this, new RewriteContext(combineConjuncts(sourceConjuncts))); + PlanNode rewrittenFilteringSource = + node.getFilteringSource() + .accept(this, new RewriteContext(combineConjuncts(filteringSourceConjuncts))); + + PlanNode output = appendSortNode(node, rewrittenSource, rewrittenFilteringSource); + if (!postJoinConjuncts.isEmpty()) { + output = + new FilterNode(queryId.genPlanNodeId(), output, combineConjuncts(postJoinConjuncts)); + } + return output; + } + @Override public PlanNode visitInsertTablet(InsertTabletNode node, RewriteContext context) { return node; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RowDataType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RowDataType.java index 77452dab7f9..cc4f779a5f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RowDataType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RowDataType.java @@ -70,7 +70,7 @@ public class RowDataType extends DataType { return Objects.hash(fields); } - public static class Field extends Node { + public static class Field extends Expression { private final Optional<Identifier> name; private final DataType type; @@ -103,22 +103,10 @@ public class RowDataType extends DataType { } @Override - protected <R, C> R accept(AstVisitor<R, C> visitor, C context) { + public <R, C> R accept(AstVisitor<R, C> visitor, C context) { return visitor.visitRowField(this, context); } - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - if (name.isPresent()) { - builder.append(name.get()); - builder.append(" "); - } - builder.append(type); - - return builder.toString(); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 2d1585d6525..e5e5341023a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -544,7 +544,7 @@ public class CommonConfig { } public double getDiskSpaceWarningThreshold() { - return diskSpaceWarningThreshold; + return 0.00005; } public void setDiskSpaceWarningThreshold(double diskSpaceWarningThreshold) {
