This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch udtf-optimize in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4ba48079a1b78ca7977744dc38f5079b54b80dbe Author: Chen YZ <[email protected]> AuthorDate: Sat Feb 22 22:53:38 2025 +0800 fix pass through --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 4 ++-- .../process/function/PartitionRecognizer.java | 8 ++++++-- .../process/function/TableFunctionOperator.java | 7 +++---- .../operator/process/function/partition/Slice.java | 20 ++++++++++++-------- .../process/function/partition/SliceCache.java | 2 +- .../plan/planner/TableOperatorGenerator.java | 13 +++++++++++-- .../planner/optimizations/ParallelizeAuxSort.java | 7 ------- 7 files changed, 35 insertions(+), 26 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index ffb61da07e1..183f41294e2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -97,7 +97,7 @@ public class ConfigNodeConfig { /** The policy of extension DataRegionGroup for each Database. */ private RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy = - RegionGroupExtensionPolicy.AUTO; + RegionGroupExtensionPolicy.CUSTOM; /** * When set data_region_group_extension_policy=CUSTOM, this parameter is the default number of @@ -110,7 +110,7 @@ public class ConfigNodeConfig { * The maximum number of DataRegions expected to be managed by each DataNode. Set to 0 means that * each dataNode automatically has the number of CPU cores / 2 regions. */ - private int dataRegionPerDataNode = 0; + private int dataRegionPerDataNode = 1; /** each dataNode automatically has the number of CPU cores / 2 regions. */ private double dataRegionPerDataNodeProportion = 0.5; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java index 6068aeadc39..bec4668f5a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java @@ -35,7 +35,8 @@ public class PartitionRecognizer { private final List<Integer> partitionChannels; private final List<Object> partitionValues; - private final int[] requiredChannels; + private final List<Integer> requiredChannels; + private final List<Integer> passThroughChannels; private final List<Type> inputDataTypes; private TsBlock currentTsBlock = null; private boolean noMoreData = false; @@ -45,13 +46,15 @@ public class PartitionRecognizer { public PartitionRecognizer( List<Integer> partitionChannels, List<Integer> requiredChannels, + List<Integer> passThroughChannels, List<TSDataType> inputDataTypes) { this.partitionChannels = partitionChannels; this.partitionValues = new ArrayList<>(partitionChannels.size()); for (int i = 0; i < partitionChannels.size(); i++) { partitionValues.add(null); } - this.requiredChannels = requiredChannels.stream().mapToInt(i -> i).toArray(); + this.requiredChannels = requiredChannels; + this.passThroughChannels = passThroughChannels; this.inputDataTypes = UDFDataTypeTransformer.transformToUDFDataTypeList(inputDataTypes); } @@ -178,6 +181,7 @@ public class PartitionRecognizer { endPartitionIndex, currentTsBlock.getValueColumns(), requiredChannels, + passThroughChannels, inputDataTypes); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java index 23edce9417f..7a2d4835cb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java @@ -25,7 +25,6 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperato import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.SliceCache; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; @@ -42,7 +41,6 @@ import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Optional; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; @@ -73,14 +71,15 @@ public class TableFunctionOperator implements ProcessOperator { List<TSDataType> outputDataTypes, int properChannelCount, List<Integer> requiredChannels, - Optional<TableFunctionNode.PassThroughSpecification> passThroughSpecifications, + List<Integer> passThroughChannels, List<Integer> partitionChannels) { this.operatorContext = operatorContext; this.inputOperator = inputOperator; this.properChannelCount = properChannelCount; this.processorProvider = processorProvider; this.partitionRecognizer = - new PartitionRecognizer(partitionChannels, requiredChannels, inputDataTypes); + new PartitionRecognizer( + partitionChannels, requiredChannels, passThroughChannels, inputDataTypes); this.needPassThrough = properChannelCount != outputDataTypes.size(); this.partitionState = null; this.blockBuilder = new TsBlockBuilder(outputDataTypes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java index 19b7d47f4e9..f488ef89a62 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java @@ -38,21 +38,25 @@ public class Slice { private final int startIndex; private final int endIndex; private final Column[] requiredColumns; - private final Column[] originalColumns; + private final Column[] passThroughColumns; private final List<Type> dataTypes; public Slice( int startIndex, int endIndex, Column[] columns, - int[] requiredChannels, + List<Integer> requiredChannels, + List<Integer> passThroughChannels, List<Type> dataTypes) { this.startIndex = startIndex; this.endIndex = endIndex; - this.originalColumns = columns; - this.requiredColumns = new Column[requiredChannels.length]; - for (int i = 0; i < requiredChannels.length; i++) { - requiredColumns[i] = columns[requiredChannels[i]]; + this.requiredColumns = new Column[requiredChannels.size()]; + for (int i = 0; i < requiredChannels.size(); i++) { + requiredColumns[i] = columns[requiredChannels.get(i)]; + } + this.passThroughColumns = new Column[passThroughChannels.size()]; + for (int i = 0; i < passThroughChannels.size(); i++) { + passThroughColumns[i] = columns[passThroughChannels.get(i)]; } this.dataTypes = dataTypes; } @@ -61,8 +65,8 @@ public class Slice { return endIndex - startIndex; } - public Record getOriginalRecord(int offset) { - return getRecord(startIndex + offset, originalColumns); + public Record getPassThroughRecord(int offset) { + return getRecord(startIndex + offset, passThroughColumns); } public Iterator<Record> getRequiredRecordIterator() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java index 993289fd1a9..cc18f5cb30e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java @@ -34,7 +34,7 @@ public class SliceCache { for (Slice slice : slices) { long currentSize = slice.getSize(); if (index < previousSize + currentSize) { - return slice.getOriginalRecord((int) (index - previousSize)); + return slice.getPassThroughRecord((int) (index - previousSize)); } previousSize += currentSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 96af225cd2d..00309823713 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -2384,7 +2384,16 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols()); List<Integer> requiredChannels = getChannelsForSymbols(node.getRequiredSymbols(), childLayout); - + List<Integer> passThroughChannels = + passThroughSpecification + .map( + passThrough -> + getChannelsForSymbols( + passThrough.getColumns().stream() + .map(TableFunctionNode.PassThroughColumn::getSymbol) + .collect(Collectors.toList()), + childLayout)) + .orElse(Collections.emptyList()); List<Integer> partitionChannels; if (node.getDataOrganizationSpecification().isPresent()) { partitionChannels = @@ -2401,7 +2410,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution outputDataTypes, properChannelCount, requiredChannels, - passThroughSpecification, + passThroughChannels, partitionChannels); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java index d175331533c..0acbfc504bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java @@ -67,13 +67,6 @@ public class ParallelizeAuxSort implements PlanOptimizer { // return plan.accept(new Rewriter(context.getAnalysis()), new Context()); } - private void print(PlanNode node) { - PlanGraphPrinter.print(node); - for (PlanNode child : node.getChildren()) { - print(child); - } - } - private static class Rewriter extends PlanVisitor<PlanNode, Context> { private final Analysis analysis;
