This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch udtf-optimize in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 249691ced288fd3e5ec5a63d1be50b628cbc53b5 Author: Chen YZ <[email protected]> AuthorDate: Sat Feb 22 16:06:03 2025 +0800 done --- .../apache/iotdb/udf/table/HOPTableFunction.java | 158 +++++++++++ .../it/query/recent/IoTDBFillTableIT.java | 12 +- .../plan/planner/plan/node/PlanGraphPrinter.java | 11 + .../plan/planner/plan/node/PlanNodeType.java | 4 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../plan/relational/planner/OrderingScheme.java | 12 + .../plan/relational/planner/QueryPlanner.java | 8 +- .../distribute/TableDistributedPlanGenerator.java | 158 +++++++++-- .../TableModelTypeProviderExtractor.java | 7 + .../rule/ImplementTableFunctionSource.java | 31 ++- .../plan/relational/planner/node/AuxSortNode.java | 139 ++++++++++ .../optimizations/LogicalOptimizeFactory.java | 3 +- .../planner/optimizations/ParallelizeAuxSort.java | 289 +++++++++++++++++++++ .../PushLimitOffsetIntoTableScan.java | 6 + .../optimizations/TransformSortToStreamSort.java | 6 + .../optimizations/UnaliasSymbolReferences.java | 21 ++ 16 files changed, 834 insertions(+), 36 deletions(-) diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java new file mode 100644 index 00000000000..fb9f7beac33 --- /dev/null +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.table; + +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; +import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class HOPTableFunction implements TableFunction { + + private static final String DATA_PARAMETER_NAME = "DATA"; + private static final String TIMECOL_PARAMETER_NAME = "TIMECOL"; + private static final String SLIDE_PARAMETER_NAME = "SLIDE"; + private static final String SIZE_PARAMETER_NAME = "SIZE"; + private static final String START_PARAMETER_NAME = "START"; + + @Override + public List<ParameterSpecification> getArgumentsSpecifications() { + return Arrays.asList( + TableParameterSpecification.builder() + .name(DATA_PARAMETER_NAME) + .passThroughColumns() + .keepWhenEmpty() + .build(), + ScalarParameterSpecification.builder() + .name(TIMECOL_PARAMETER_NAME) + .type(Type.STRING) + .build(), + ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(), + ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(), + ScalarParameterSpecification.builder() + .name(START_PARAMETER_NAME) + .type(Type.TIMESTAMP) + .defaultValue(Long.MIN_VALUE) + .build()); + } + + private int findTimeColumnIndex(TableArgument tableArgument, String expectedFieldName) { + int requiredIndex = -1; + for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) { + Optional<String> fieldName = tableArgument.getFieldNames().get(i); + if (fieldName.isPresent() && expectedFieldName.equalsIgnoreCase(fieldName.get())) { + requiredIndex = i; + break; + } + } + return requiredIndex; + } + + @Override + public TableFunctionAnalysis analyze(Map<String, Argument> arguments) { + TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); + String expectedFieldName = + (String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue(); + int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName); + if (requiredIndex == -1) { + throw new UDFException("The required field is not found in the input table"); + } + DescribedSchema properColumnSchema = + new DescribedSchema.Builder() + .addField("window_start", Type.TIMESTAMP) + .addField("window_end", Type.TIMESTAMP) + .build(); + + // outputColumnSchema + return TableFunctionAnalysis.builder() + .properColumnSchema(properColumnSchema) + .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .build(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + return new TableFunctionProcessorProvider() { + @Override + public TableFunctionDataProcessor getDataProcessor() { + return new HOPDataProcessor( + (Long) ((ScalarArgument) arguments.get(START_PARAMETER_NAME)).getValue(), + (Long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(), + (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + } + }; + } + + private static class HOPDataProcessor implements TableFunctionDataProcessor { + + private final long slide; + private final long size; + private long curTime; + private long curIndex = 0; + + public HOPDataProcessor(long startTime, long slide, long size) { + this.slide = slide; + this.size = size; + this.curTime = startTime; + } + + @Override + public void process( + Record input, + List<ColumnBuilder> properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + long timeValue = input.getLong(0); + if (curTime == Long.MIN_VALUE) { + curTime = timeValue; + } + if (curTime + size <= timeValue) { + // jump to appropriate window + long move = (timeValue - curTime - size) / slide + 1; + curTime += move * slide; + } + long slideTime = curTime; + while (slideTime <= timeValue && slideTime + size > timeValue) { + properColumnBuilders.get(0).writeLong(slideTime); + properColumnBuilders.get(1).writeLong(slideTime + size); + passThroughIndexBuilder.writeLong(curIndex); + slideTime += slide; + } + curIndex++; + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java index 3884b1f2be1..12e9129f49d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBFillTableIT.java @@ -298,14 +298,14 @@ public class IoTDBFillTableIT { "1970-01-01T00:00:00.002Z,shanghai,d1,212,2111,", }; tableResultSetEqualTest( - "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS FILL_GROUP 2,3", + "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS FILL_GROUP 2,3 order by 2,3", expectedHeader, retArray, DATABASE_NAME); // case11: all without time filter using previous fill with FILL_GROUP and TIME_COLUMN tableResultSetEqualTest( - "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS TIME_COLUMN 1 FILL_GROUP 2,3", + "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS TIME_COLUMN 1 FILL_GROUP 2,3 order by 2,3", expectedHeader, retArray, DATABASE_NAME); @@ -326,7 +326,7 @@ public class IoTDBFillTableIT { "1970-01-01T00:00:00.002Z,shanghai,d1,212,2111,", }; tableResultSetEqualTest( - "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS TIME_BOUND 2ms FILL_GROUP 2,3", + "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS TIME_BOUND 2ms FILL_GROUP 2,3 order by 2,3", expectedHeader, retArray, DATABASE_NAME); @@ -334,7 +334,7 @@ public class IoTDBFillTableIT { // case13: all without time filter using previous fill with TIME_BOUND, FILL_GROUP and // TIME_COLUMN tableResultSetEqualTest( - "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS TIME_BOUND 2ms TIME_COLUMN 1 FILL_GROUP 2,3", + "select time,city,device_id,s1,s2 from table2 FILL METHOD PREVIOUS TIME_BOUND 2ms TIME_COLUMN 1 FILL_GROUP 2,3 order by 2,3", expectedHeader, retArray, DATABASE_NAME); @@ -503,14 +503,14 @@ public class IoTDBFillTableIT { "1970-01-01T00:00:00.002Z,shanghai,d1,212,null,", }; tableResultSetEqualTest( - "select time,city,device_id,s1,s2 from table3 FILL METHOD LINEAR FILL_GROUP 2,3", + "select time,city,device_id,s1,s2 from table3 FILL METHOD LINEAR FILL_GROUP 2,3 order by 2,3", expectedHeader, retArray, DATABASE_NAME); // case11: all without time filter using linear fill with FILL_GROUP and TIME_COLUMN tableResultSetEqualTest( - "select time,city,device_id,s1,s2 from table3 FILL METHOD LINEAR TIME_COLUMN 1 FILL_GROUP 2,3", + "select time,city,device_id,s1,s2 from table3 FILL METHOD LINEAR TIME_COLUMN 1 FILL_GROUP 2,3 order by 2,3", expectedHeader, retArray, DATABASE_NAME); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index a0144a59e38..0ad7ab155ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -67,6 +67,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewInt import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; @@ -926,6 +927,16 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter return render(node, boxValue, context); } + @Override + public List<String> visitAuxSort(AuxSortNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("AuxSort-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("EnableParalleled: %s", node.isEnableParalleled())); + boxValue.add(String.format("PartitionKeyCount: %s", node.getPartitionKeyCount())); + boxValue.add(String.format("OrderingScheme: %s", node.getOrderingScheme())); + return render(node, boxValue, context); + } + @Override public List<String> visitMergeSort( org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode node, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 38145ce0194..04308a71323 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -117,6 +117,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; @@ -296,6 +297,7 @@ public enum PlanNodeType { MARK_DISTINCT_NODE((short) 1026), TABLE_FUNCTION_NODE((short) 1027), TABLE_FUNCTION_PROCESSOR_NODE((short) 1028), + TABLE_AUX_SORT_NODE((short) 1029), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), @@ -669,6 +671,8 @@ public enum PlanNodeType { return TableFunctionNode.deserialize(buffer); case 1028: return TableFunctionProcessorNode.deserialize(buffer); + case 1029: + return AuxSortNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); case 2001: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 0100cbbaeef..da861abf15f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -748,6 +748,11 @@ public abstract class PlanVisitor<R, C> { return visitSingleChildProcess(node, context); } + public R visitAuxSort( + org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode node, C context) { + return visitSingleChildProcess(node, context); + } + public R visitTopK( org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode node, C context) { return visitMultiChildProcess(node, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java index 508408571bc..97ad4a78d8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java @@ -53,6 +53,18 @@ public class OrderingScheme { this.orderings = ImmutableMap.copyOf(orderings); } + public static OrderingScheme combine(OrderingScheme a, OrderingScheme b) { + List<Symbol> orderBy = new ArrayList<>(a.orderBy); + Map<Symbol, SortOrder> orderings = new HashMap<>(a.orderings); + for (Symbol symbol : b.orderBy) { + if (!orderings.containsKey(symbol)) { + orderBy.add(symbol); + orderings.put(symbol, b.orderings.get(symbol)); + } + } + return new OrderingScheme(orderBy, orderings); + } + public List<Symbol> getOrderBy() { return orderBy; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java index d0da64317b4..26e57030fe1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GapFillStartAndEndTimeExtractVisitor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Aggregation; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; @@ -821,8 +822,11 @@ public class QueryPlanner { OrderingScheme orderingScheme = new OrderingScheme(orderBySymbols.build(), orderings); analysis.setSortNode(true); return subPlan.withNewRoot( - new SortNode( - queryIdAllocator.genPlanNodeId(), subPlan.getRoot(), orderingScheme, false, false)); + new AuxSortNode( + queryIdAllocator.genPlanNodeId(), + subPlan.getRoot(), + orderingScheme, + groupingKeys.size())); } private PlanBuilder distinct( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 91f357168ef..49b1bf790fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -43,6 +43,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -100,7 +101,6 @@ import java.util.stream.IntStream; import static com.google.common.collect.ImmutableList.toImmutableList; import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN; -import static org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator.createTreeDeviceIdColumnValueExtractor; import static org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.GROUP_KEY_SUFFIX; import static org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.SEPARATOR; import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE; @@ -189,14 +189,20 @@ public class TableDistributedPlanGenerator if (!(node instanceof ValueFillNode)) { context.clearExpectedOrderingScheme(); } + boolean parallel = + (node.getChild() instanceof AuxSortNode) + && ((AuxSortNode) node.getChild()).isEnableParalleled(); List<PlanNode> childrenNodes = node.getChild().accept(this, context); OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()); if (childOrdering != null) { nodeOrderingMap.put(node.getPlanNodeId(), childOrdering); } - - node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes)); - return Collections.singletonList(node); + if (!parallel) { + node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes)); + return Collections.singletonList(node); + } else { + return splitForEachChild(node, childrenNodes); + } } @Override @@ -272,9 +278,7 @@ public class TableDistributedPlanGenerator ProjectNode subProjectNode = new ProjectNode(queryId.genPlanNodeId(), child, node.getAssignments()); resultNodeList.add(subProjectNode); - if (i == 0) { - nodeOrderingMap.put(subProjectNode.getPlanNodeId(), childOrdering); - } + nodeOrderingMap.put(subProjectNode.getPlanNodeId(), childOrdering); } return resultNodeList; } @@ -310,6 +314,36 @@ public class TableDistributedPlanGenerator return Collections.singletonList(newTopKNode); } + @Override + public List<PlanNode> visitAuxSort(AuxSortNode node, PlanContext context) { + boolean pushDown = context.pushDownAuxSort; + try { + context.setPushDownAuxSort(node.isEnableParalleled()); + if (node.isEnableParalleled()) { + List<PlanNode> result = new ArrayList<>(); + context.setExpectedOrderingScheme(node.getOrderingScheme()); + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + for (PlanNode child : childrenNodes) { + if (canSortEliminated( + node.getOrderingScheme(), nodeOrderingMap.get(child.getPlanNodeId()))) { + result.add(child); + } else { + SortNode subSortNode = + new SortNode( + queryId.genPlanNodeId(), child, node.getOrderingScheme(), false, false); + result.add(subSortNode); + nodeOrderingMap.put(subSortNode.getPlanNodeId(), subSortNode.getOrderingScheme()); + } + } + return result; + } else { + return visitSort(node, context); + } + } finally { + context.pushDownAuxSort = pushDown; + } + } + @Override public List<PlanNode> visitSort(SortNode node, PlanContext context) { context.setExpectedOrderingScheme(node.getOrderingScheme()); @@ -433,9 +467,7 @@ public class TableDistributedPlanGenerator FilterNode subFilterNode = new FilterNode(queryId.genPlanNodeId(), child, node.getPredicate()); resultNodeList.add(subFilterNode); - if (i == 0) { - nodeOrderingMap.put(subFilterNode.getPlanNodeId(), childOrdering); - } + nodeOrderingMap.put(subFilterNode.getPlanNodeId(), childOrdering); } return resultNodeList; } @@ -483,8 +515,79 @@ public class TableDistributedPlanGenerator public List<PlanNode> visitDeviceTableScan( final DeviceTableScanNode node, final PlanContext context) { + if (context.isPushDownAuxSort()) { + return constructDeviceTableScanByTags(node, context); + } else { + return constructDeviceTableScanByRegionReplicaSet(node, context); + } + } + + private List<PlanNode> constructDeviceTableScanByTags( + final DeviceTableScanNode node, final PlanContext context) { + List<PlanNode> result = new ArrayList<>(); + List<DeviceEntry> crossRegionDevices = new ArrayList<>(); + final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new HashMap<>(); + final Map<TRegionReplicaSet, Integer> regionDeviceCount = new HashMap<>(); + for (final DeviceEntry deviceEntry : node.getDeviceEntries()) { + final List<TRegionReplicaSet> regionReplicaSets = + analysis.getDataRegionReplicaSetWithTimeFilter( + node.getQualifiedObjectName().getDatabaseName(), + deviceEntry.getDeviceID(), + node.getTimeFilter()); + regionReplicaSets.forEach( + regionReplicaSet -> + regionDeviceCount.put( + regionReplicaSet, regionDeviceCount.getOrDefault(regionReplicaSet, 0) + 1)); + if (regionReplicaSets.size() != 1) { + crossRegionDevices.add(deviceEntry); + continue; + } + final DeviceTableScanNode deviceTableScanNode = + tableScanNodeMap.computeIfAbsent( + regionReplicaSets.get(0), + k -> { + final DeviceTableScanNode scanNode = + new DeviceTableScanNode( + queryId.genPlanNodeId(), + node.getQualifiedObjectName(), + node.getOutputSymbols(), + node.getAssignments(), + new ArrayList<>(), + node.getIdAndAttributeIndexMap(), + node.getScanOrder(), + node.getTimePredicate().orElse(null), + node.getPushDownPredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.containsNonAlignedDevice()); + scanNode.setRegionReplicaSet(regionReplicaSets.get(0)); + return scanNode; + }); + deviceTableScanNode.appendDeviceEntry(deviceEntry); + } + result.addAll(tableScanNodeMap.values()); + if (context.hasSortProperty) { + processSortProperty(node, result, context); + } + context.mostUsedRegion = + regionDeviceCount.entrySet().stream() + .max(Comparator.comparingInt(Map.Entry::getValue)) + .map(Map.Entry::getKey) + .orElse(null); + if(!crossRegionDevices.isEmpty()) { + node.setDeviceEntries(crossRegionDevices); + result.add( + new CollectNode(queryId.genPlanNodeId(), constructDeviceTableScanByRegionReplicaSet(node, context), node.getOutputSymbols())); + } + return result; + } + private List<PlanNode> constructDeviceTableScanByRegionReplicaSet( + final DeviceTableScanNode node, final PlanContext context) { + + final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new HashMap<>(); for (final DeviceEntry deviceEntry : node.getDeviceEntries()) { final List<TRegionReplicaSet> regionReplicaSets = analysis.getDataRegionReplicaSetWithTimeFilter( @@ -497,7 +600,7 @@ public class TableDistributedPlanGenerator tableScanNodeMap.computeIfAbsent( regionReplicaSet, k -> { - DeviceTableScanNode scanNode = + final DeviceTableScanNode scanNode = new DeviceTableScanNode( queryId.genPlanNodeId(), node.getQualifiedObjectName(), @@ -518,7 +621,6 @@ public class TableDistributedPlanGenerator deviceTableScanNode.appendDeviceEntry(deviceEntry); } } - if (tableScanNodeMap.isEmpty()) { node.setRegionReplicaSet(NOT_ASSIGNED); return Collections.singletonList(node); @@ -685,7 +787,6 @@ public class TableDistributedPlanGenerator return resultTableScanNodeList; } - // TODO(UDF): 参考 visitAggregation 和 visitSort @Override public List<PlanNode> visitAggregation(AggregationNode node, PlanContext context) { if (node.isStreamable()) { @@ -851,22 +952,40 @@ public class TableDistributedPlanGenerator return Collections.singletonList(node); } + private List<PlanNode> splitForEachChild(PlanNode node, List<PlanNode> childrenNodes) { + ImmutableList.Builder<PlanNode> result = ImmutableList.builder(); + for (PlanNode child : childrenNodes) { + PlanNode subNode = node.clone(); + subNode.addChild(child); + subNode.setPlanNodeId(queryId.genPlanNodeId()); + result.add(subNode); + } + return result.build(); + } + @Override public List<PlanNode> visitTableFunctionProcessor( TableFunctionProcessorNode node, PlanContext context) { + context.clearExpectedOrderingScheme(); if (node.getChildren().isEmpty()) { return Collections.singletonList(node); } + boolean parallel = + (node.getChild() instanceof AuxSortNode) + && ((AuxSortNode) node.getChild()).isEnableParalleled(); List<PlanNode> childrenNodes = node.getChild().accept(this, context); if (childrenNodes.size() == 1) { node.setChild(childrenNodes.get(0)); - } else { + return Collections.singletonList(node); + } else if (!parallel) { CollectNode collectNode = new CollectNode(queryId.genPlanNodeId(), node.getChildren().get(0).getOutputSymbols()); childrenNodes.forEach(collectNode::addChild); node.setChild(collectNode); + return Collections.singletonList(node); + } else { + return splitForEachChild(node, childrenNodes); } - return Collections.singletonList(node); } private void buildRegionNodeMap( @@ -1265,6 +1384,7 @@ public class TableDistributedPlanGenerator final Map<PlanNodeId, NodeDistribution> nodeDistributionMap; boolean hasExchangeNode = false; boolean hasSortProperty = false; + boolean pushDownAuxSort = false; OrderingScheme expectedOrderingScheme; TRegionReplicaSet mostUsedRegion; @@ -1285,5 +1405,13 @@ public class TableDistributedPlanGenerator this.expectedOrderingScheme = expectedOrderingScheme; hasSortProperty = true; } + + public void setPushDownAuxSort(boolean pushDownAuxSort) { + this.pushDownAuxSort = pushDownAuxSort; + } + + public boolean isPushDownAuxSort() { + return pushDownAuxSort; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java index 43c52a78282..d2596b45593 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkN import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode; @@ -203,6 +204,12 @@ public class TableModelTypeProviderExtractor { return null; } + @Override + public Void visitAuxSort(AuxSortNode node, Void context) { + node.getChild().accept(this, context); + return null; + } + @Override public Void visitMergeSort(MergeSortNode node, Void context) { node.getChildren().forEach(c -> c.accept(this, context)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java index 810a008e335..b189d2d5e97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java @@ -24,16 +24,19 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -125,26 +128,30 @@ public class ImplementTableFunctionSource implements Rule<TableFunctionNode> { .getDataOrganizationSpecification() .ifPresent( dataOrganizationSpecification -> { - ImmutableList.Builder<Symbol> orderBy = ImmutableList.builder(); - ImmutableMap.Builder<Symbol, SortOrder> orderings = ImmutableMap.builder(); + List<Symbol> sortSymbols = new ArrayList<>(); + Map<Symbol, SortOrder> sortOrderings = new HashMap<>(); for (Symbol symbol : dataOrganizationSpecification.getPartitionBy()) { - orderBy.add(symbol); - orderings.put(symbol, ASC_NULLS_LAST); + sortSymbols.add(symbol); + sortOrderings.put(symbol, ASC_NULLS_LAST); } + int sortKeyOffset = sortSymbols.size(); dataOrganizationSpecification .getOrderingScheme() .ifPresent( orderingScheme -> { - orderBy.addAll(orderingScheme.getOrderBy()); - orderings.putAll(orderingScheme.getOrderings()); + for (Symbol symbol : orderingScheme.getOrderBy()) { + if (!sortOrderings.containsKey(symbol)) { + sortSymbols.add(symbol); + sortOrderings.put(symbol, orderingScheme.getOrdering(symbol)); + } + } }); child.set( - new SortNode( + new AuxSortNode( context.getIdAllocator().genPlanNodeId(), child.get(), - new OrderingScheme(orderBy.build(), orderings.build()), - false, - false)); + new OrderingScheme(sortSymbols, sortOrderings), + sortKeyOffset)); }); return Result.ofPlanNode( new TableFunctionProcessorNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java new file mode 100644 index 00000000000..3c72a0ee8ba --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; + +import com.google.common.collect.Iterables; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Auxiliary node for sorting. Not generated by user queries, it is used by special process such as + * table function. + */ +public class AuxSortNode extends SortNode { + + private boolean enableParalleled = false; + + /** + * AuxSort#orderingScheme may include two parts: partition key and sort key. For example, + * PARTITION BY device_id, color ORDER BY time will construct an AuxSortNode with + * OrderingScheme[device_id, color, time] and partitionColumnCount = 2. + */ + private int partitionKeyCount; + + public AuxSortNode(PlanNodeId id, PlanNode child, OrderingScheme scheme, int partitionKeyCount) { + super(id, child, scheme, false, false); + this.partitionKeyCount = partitionKeyCount; + } + + public AuxSortNode( + PlanNodeId id, + PlanNode child, + OrderingScheme scheme, + boolean partial, + boolean orderByAllIdsAndTime, + boolean enableParalleled, + int partitionKeyCount) { + super(id, child, scheme, partial, orderByAllIdsAndTime); + this.enableParalleled = enableParalleled; + this.partitionKeyCount = partitionKeyCount; + } + + @Override + public PlanNode replaceChildren(List<PlanNode> newChildren) { + return new AuxSortNode( + id, + Iterables.getOnlyElement(newChildren), + orderingScheme, + partial, + orderByAllIdsAndTime, + enableParalleled, + partitionKeyCount); + } + + public boolean isEnableParalleled() { + return enableParalleled; + } + + public void setEnableParalleled(boolean enableParalleled) { + this.enableParalleled = enableParalleled; + } + + public int getPartitionKeyCount() { + return partitionKeyCount; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitAuxSort(this, context); + } + + @Override + public PlanNode clone() { + return new AuxSortNode( + id, + null, + orderingScheme, + partial, + orderByAllIdsAndTime, + enableParalleled, + partitionKeyCount); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_AUX_SORT_NODE.serialize(byteBuffer); + orderingScheme.serialize(byteBuffer); + ReadWriteIOUtils.write(enableParalleled, byteBuffer); + ReadWriteIOUtils.write(partitionKeyCount, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_AUX_SORT_NODE.serialize(stream); + orderingScheme.serialize(stream); + ReadWriteIOUtils.write(enableParalleled, stream); + ReadWriteIOUtils.write(partitionKeyCount, stream); + } + + public static AuxSortNode deserialize(ByteBuffer byteBuffer) { + OrderingScheme orderingScheme = OrderingScheme.deserialize(byteBuffer); + boolean enableParalleled = ReadWriteIOUtils.readBoolean(byteBuffer); + int partitionColumnCount = ReadWriteIOUtils.readInt(byteBuffer); + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new AuxSortNode( + planNodeId, null, orderingScheme, false, false, enableParalleled, partitionColumnCount); + } + + @Override + public String toString() { + return "AuxSortNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index 3b7b7e6267b..ae3dff9d838 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -260,7 +260,8 @@ public class LogicalOptimizeFactory { new IterativeOptimizer( plannerContext, ruleStats, - ImmutableSet.of(new MergeLimitWithSort(), new MergeLimitOverProjectWithSort()))); + ImmutableSet.of(new MergeLimitWithSort(), new MergeLimitOverProjectWithSort())), + new ParallelizeAuxSort()); this.planOptimizers = optimizerBuilder.build(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java new file mode 100644 index 00000000000..d175331533c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG; +import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.ENABLE; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.PENDING; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.UNABLE; + +public class ParallelizeAuxSort implements PlanOptimizer { + @Override + public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) { + if (!(context.getAnalysis().isQuery())) { + return plan; + } + System.out.println("before optimize ParallelizeAuxSort =========================="); + PlanGraphPrinter.print(plan); + PlanNode res = plan.accept(new Rewriter(context.getAnalysis()), new Context(null, 0)); + System.out.println("after optimize ParallelizeAuxSort =========================="); + PlanGraphPrinter.print(res); + return res; + // return plan.accept(new Rewriter(context.getAnalysis()), new Context()); + } + + private void print(PlanNode node) { + PlanGraphPrinter.print(node); + for (PlanNode child : node.getChildren()) { + print(child); + } + } + + private static class Rewriter extends PlanVisitor<PlanNode, Context> { + private final Analysis analysis; + + public Rewriter(Analysis analysis) { + this.analysis = analysis; + } + + @Override + public PlanNode visitPlan(PlanNode node, Context context) { + PlanNode newNode = node.clone(); + for (PlanNode child : node.getChildren()) { + newNode.addChild(child.accept(this, context)); + } + return newNode; + } + + /** + * We need to make sure: + * + * <ul> + * <li>(1) All keys in context#orderKey are used for partition. + * <li>(2) childOrderSchema can match the prefix of context#orderKey, so that partition-based + * operation can be pushed down. + * </ul> + */ + private void checkPrefixMatch(Context context, List<Symbol> childOrder) { + if (context.canSkip()) { + return; + } + OrderingScheme prefix = context.orderKey; + if (prefix.getOrderBy().size() != context.partitionKeyCount) { + context.canPushDown = UNABLE; + return; + } + if (prefix.getOrderBy().size() > childOrder.size()) { + context.canPushDown = UNABLE; + return; + } + for (int i = 0; i < prefix.getOrderBy().size(); i++) { + Symbol lhs = prefix.getOrderBy().get(i); + Symbol rhs = childOrder.get(i); + if (!lhs.equals(rhs)) { + context.canPushDown = UNABLE; + return; + } + } + context.canPushDown = ENABLE; + } + + @Override + public PlanNode visitAuxSort(AuxSortNode node, Context context) { + checkPrefixMatch(context, node.getOrderingScheme().getOrderBy()); + Context newContext = new Context(node.getOrderingScheme(), node.getPartitionKeyCount()); + AuxSortNode newNode = (AuxSortNode) node.clone(); + newNode.addChild(node.getChild().accept(this, newContext)); + if (newContext.canPushDown.equals(ENABLE)) { + newNode.setEnableParalleled(true); + } + return newNode; + } + + @Override + public PlanNode visitSort(SortNode node, Context context) { + checkPrefixMatch(context, node.getOrderingScheme().getOrderBy()); + return visitPlan(node, context); + } + + @Override + public PlanNode visitStreamSort(StreamSortNode node, Context context) { + checkPrefixMatch(context, node.getOrderingScheme().getOrderBy()); + return visitPlan(node, context); + } + + @Override + public PlanNode visitTopK(TopKNode node, Context context) { + checkPrefixMatch(context, node.getOrderingScheme().getOrderBy()); + return visitPlan(node, context); + } + + @Override + public PlanNode visitJoin(JoinNode node, Context context) { + context.canPushDown = UNABLE; + return visitPlan(node, context); + } + + @Override + public PlanNode visitCorrelatedJoin(CorrelatedJoinNode node, Context context) { + context.canPushDown = UNABLE; + return visitPlan(node, context); + } + + @Override + public PlanNode visitSemiJoin(SemiJoinNode node, Context context) { + context.canPushDown = UNABLE; + return visitPlan(node, context); + } + + @Override + public PlanNode visitTableFunctionProcessor(TableFunctionProcessorNode node, Context context) { + if (!context.canSkip()) { + if (node.getChildren().isEmpty()) { + // leaf node + context.canPushDown = UNABLE; + return node; + } + Optional<DataOrganizationSpecification> dataOrganizationSpecification = + node.getDataOrganizationSpecification(); + if (!dataOrganizationSpecification.isPresent()) { + context.canPushDown = UNABLE; + } else { + checkPrefixMatch(context, dataOrganizationSpecification.get().getPartitionBy()); + } + } + return visitPlan(node, context); + } + + @Override + public PlanNode visitProject(ProjectNode node, Context context) { + if (!context.canSkip()) { + OrderingScheme orderKey = context.orderKey; + for (int i = 0; i < orderKey.getOrderBy().size(); i++) { + if (!node.getAssignments().contains(orderKey.getOrderBy().get(i))) { + context.canPushDown = UNABLE; + break; + } + } + } + return visitPlan(node, context); + } + + @Override + public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context context) { + if (!context.canSkip()) { + OrderingScheme orderKey = context.orderKey; + Map<Symbol, ColumnSchema> tableColumnSchema = + analysis.getTableColumnSchema(node.getQualifiedObjectName()); + // 1. It is possible for the last sort key to be a time column + if (orderKey.getOrderBy().size() > context.partitionKeyCount + 1) { + context.canPushDown = UNABLE; + return node; + } else if (orderKey.getOrderBy().size() == context.partitionKeyCount + 1) { + Symbol lastSymbol = orderKey.getOrderBy().get(context.partitionKeyCount); + if (!tableColumnSchema.containsKey(lastSymbol) + || tableColumnSchema.get(lastSymbol).getColumnCategory() != TIME) { + context.canPushDown = UNABLE; + return node; + } + } + // 2. check there are no field in orderKey and all tags in orderKey + Set<Symbol> tagSymbols = + tableColumnSchema.entrySet().stream() + .filter(entry -> entry.getValue().getColumnCategory() == TAG) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + for (int i = 0; i < context.partitionKeyCount; i++) { + Symbol symbol = orderKey.getOrderBy().get(i); + if (!tableColumnSchema.containsKey(symbol)) { + context.canPushDown = UNABLE; + return node; + } + switch (tableColumnSchema.get(symbol).getColumnCategory()) { + case TAG: + tagSymbols.remove(symbol); + break; + case ATTRIBUTE: + // If all tags in partition key, attributes must be the same in one partition. + break; + default: + context.canPushDown = UNABLE; + return node; + } + } + if (!tagSymbols.isEmpty()) { + context.canPushDown = UNABLE; + return node; + } + context.canPushDown = ENABLE; + } + return node; + } + + @Override + public PlanNode visitAggregation(AggregationNode node, Context context) { + return super.visitAggregation(node, context); + } + + @Override + public PlanNode visitAggregationTableScan(AggregationTableScanNode node, Context context) { + return super.visitAggregationTableScan(node, context); + } + } + + private static class Context { + private final OrderingScheme orderKey; + private final int partitionKeyCount; + private CanPushDown canPushDown = PENDING; + + private Context(OrderingScheme orderKey, int sortKeyOffset) { + this.orderKey = orderKey; + this.partitionKeyCount = sortKeyOffset; + } + + private boolean canSkip() { + return orderKey == null || canPushDown != PENDING; + } + } + + protected enum CanPushDown { + ENABLE, + UNABLE, + PENDING + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java index 7ca40899d00..f3cb9f99776 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; @@ -219,6 +220,11 @@ public class PushLimitOffsetIntoTableScan implements PlanOptimizer { return visitSort(node, context); } + @Override + public PlanNode visitAuxSort(AuxSortNode node, Context context) { + return visitSort(node, context); + } + @Override public PlanNode visitAggregation(AggregationNode node, Context context) { context.enablePushDown = false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java index c514a46746b..f66bd9f25f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; @@ -120,6 +121,11 @@ public class TransformSortToStreamSort implements PlanOptimizer { return node; } + @Override + public PlanNode visitAuxSort(AuxSortNode node, Context context) { + return visitSingleChildProcess(node, context); + } + @Override public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context context) { context.setTableScanNode(node); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index ce6e5e50457..341add0ab31 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -423,6 +424,26 @@ public class UnaliasSymbolReferences implements PlanOptimizer { mapping); } + @Override + public PlanAndMappings visitAuxSort(AuxSortNode node, UnaliasContext context) { + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + + OrderingScheme newOrderingScheme = mapper.map(node.getOrderingScheme()); + + return new PlanAndMappings( + new AuxSortNode( + node.getPlanNodeId(), + rewrittenSource.getRoot(), + newOrderingScheme, + node.isPartial(), + node.isOrderByAllIdsAndTime(), + node.isEnableParalleled(), + node.getPartitionKeyCount()), + mapping); + } + @Override public PlanAndMappings visitFilter(FilterNode node, UnaliasContext context) { PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
