This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch udtf-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4ba48079a1b78ca7977744dc38f5079b54b80dbe
Author: Chen YZ <[email protected]>
AuthorDate: Sat Feb 22 22:53:38 2025 +0800

    fix pass through
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java      |  4 ++--
 .../process/function/PartitionRecognizer.java        |  8 ++++++--
 .../process/function/TableFunctionOperator.java      |  7 +++----
 .../operator/process/function/partition/Slice.java   | 20 ++++++++++++--------
 .../process/function/partition/SliceCache.java       |  2 +-
 .../plan/planner/TableOperatorGenerator.java         | 13 +++++++++++--
 .../planner/optimizations/ParallelizeAuxSort.java    |  7 -------
 7 files changed, 35 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index ffb61da07e1..183f41294e2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -97,7 +97,7 @@ public class ConfigNodeConfig {
 
   /** The policy of extension DataRegionGroup for each Database. */
   private RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy =
-      RegionGroupExtensionPolicy.AUTO;
+      RegionGroupExtensionPolicy.CUSTOM;
 
   /**
    * When set data_region_group_extension_policy=CUSTOM, this parameter is the 
default number of
@@ -110,7 +110,7 @@ public class ConfigNodeConfig {
    * The maximum number of DataRegions expected to be managed by each 
DataNode. Set to 0 means that
    * each dataNode automatically has the number of CPU cores / 2 regions.
    */
-  private int dataRegionPerDataNode = 0;
+  private int dataRegionPerDataNode = 1;
 
   /** each dataNode automatically has the number of CPU cores / 2 regions. */
   private double dataRegionPerDataNodeProportion = 0.5;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java
index 6068aeadc39..bec4668f5a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java
@@ -35,7 +35,8 @@ public class PartitionRecognizer {
 
   private final List<Integer> partitionChannels;
   private final List<Object> partitionValues;
-  private final int[] requiredChannels;
+  private final List<Integer> requiredChannels;
+  private final List<Integer> passThroughChannels;
   private final List<Type> inputDataTypes;
   private TsBlock currentTsBlock = null;
   private boolean noMoreData = false;
@@ -45,13 +46,15 @@ public class PartitionRecognizer {
   public PartitionRecognizer(
       List<Integer> partitionChannels,
       List<Integer> requiredChannels,
+      List<Integer> passThroughChannels,
       List<TSDataType> inputDataTypes) {
     this.partitionChannels = partitionChannels;
     this.partitionValues = new ArrayList<>(partitionChannels.size());
     for (int i = 0; i < partitionChannels.size(); i++) {
       partitionValues.add(null);
     }
-    this.requiredChannels = requiredChannels.stream().mapToInt(i -> 
i).toArray();
+    this.requiredChannels = requiredChannels;
+    this.passThroughChannels = passThroughChannels;
     this.inputDataTypes = 
UDFDataTypeTransformer.transformToUDFDataTypeList(inputDataTypes);
   }
 
@@ -178,6 +181,7 @@ public class PartitionRecognizer {
         endPartitionIndex,
         currentTsBlock.getValueColumns(),
         requiredChannels,
+        passThroughChannels,
         inputDataTypes);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
index 23edce9417f..7a2d4835cb7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperato
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.SliceCache;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 import org.apache.iotdb.udf.api.relational.access.Record;
 import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
 import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
@@ -42,7 +41,6 @@ import 
org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Optional;
 
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
 
@@ -73,14 +71,15 @@ public class TableFunctionOperator implements 
ProcessOperator {
       List<TSDataType> outputDataTypes,
       int properChannelCount,
       List<Integer> requiredChannels,
-      Optional<TableFunctionNode.PassThroughSpecification> 
passThroughSpecifications,
+      List<Integer> passThroughChannels,
       List<Integer> partitionChannels) {
     this.operatorContext = operatorContext;
     this.inputOperator = inputOperator;
     this.properChannelCount = properChannelCount;
     this.processorProvider = processorProvider;
     this.partitionRecognizer =
-        new PartitionRecognizer(partitionChannels, requiredChannels, 
inputDataTypes);
+        new PartitionRecognizer(
+            partitionChannels, requiredChannels, passThroughChannels, 
inputDataTypes);
     this.needPassThrough = properChannelCount != outputDataTypes.size();
     this.partitionState = null;
     this.blockBuilder = new TsBlockBuilder(outputDataTypes);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
index 19b7d47f4e9..f488ef89a62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
@@ -38,21 +38,25 @@ public class Slice {
   private final int startIndex;
   private final int endIndex;
   private final Column[] requiredColumns;
-  private final Column[] originalColumns;
+  private final Column[] passThroughColumns;
   private final List<Type> dataTypes;
 
   public Slice(
       int startIndex,
       int endIndex,
       Column[] columns,
-      int[] requiredChannels,
+      List<Integer> requiredChannels,
+      List<Integer> passThroughChannels,
       List<Type> dataTypes) {
     this.startIndex = startIndex;
     this.endIndex = endIndex;
-    this.originalColumns = columns;
-    this.requiredColumns = new Column[requiredChannels.length];
-    for (int i = 0; i < requiredChannels.length; i++) {
-      requiredColumns[i] = columns[requiredChannels[i]];
+    this.requiredColumns = new Column[requiredChannels.size()];
+    for (int i = 0; i < requiredChannels.size(); i++) {
+      requiredColumns[i] = columns[requiredChannels.get(i)];
+    }
+    this.passThroughColumns = new Column[passThroughChannels.size()];
+    for (int i = 0; i < passThroughChannels.size(); i++) {
+      passThroughColumns[i] = columns[passThroughChannels.get(i)];
     }
     this.dataTypes = dataTypes;
   }
@@ -61,8 +65,8 @@ public class Slice {
     return endIndex - startIndex;
   }
 
-  public Record getOriginalRecord(int offset) {
-    return getRecord(startIndex + offset, originalColumns);
+  public Record getPassThroughRecord(int offset) {
+    return getRecord(startIndex + offset, passThroughColumns);
   }
 
   public Iterator<Record> getRequiredRecordIterator() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java
index 993289fd1a9..cc18f5cb30e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java
@@ -34,7 +34,7 @@ public class SliceCache {
     for (Slice slice : slices) {
       long currentSize = slice.getSize();
       if (index < previousSize + currentSize) {
-        return slice.getOriginalRecord((int) (index - previousSize));
+        return slice.getPassThroughRecord((int) (index - previousSize));
       }
       previousSize += currentSize;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 96af225cd2d..00309823713 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -2384,7 +2384,16 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
           makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols());
       List<Integer> requiredChannels =
           getChannelsForSymbols(node.getRequiredSymbols(), childLayout);
-
+      List<Integer> passThroughChannels =
+          passThroughSpecification
+              .map(
+                  passThrough ->
+                      getChannelsForSymbols(
+                          passThrough.getColumns().stream()
+                              
.map(TableFunctionNode.PassThroughColumn::getSymbol)
+                              .collect(Collectors.toList()),
+                          childLayout))
+              .orElse(Collections.emptyList());
       List<Integer> partitionChannels;
       if (node.getDataOrganizationSpecification().isPresent()) {
         partitionChannels =
@@ -2401,7 +2410,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
           outputDataTypes,
           properChannelCount,
           requiredChannels,
-          passThroughSpecification,
+          passThroughChannels,
           partitionChannels);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
index d175331533c..0acbfc504bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java
@@ -67,13 +67,6 @@ public class ParallelizeAuxSort implements PlanOptimizer {
     //        return plan.accept(new Rewriter(context.getAnalysis()), new 
Context());
   }
 
-  private void print(PlanNode node) {
-    PlanGraphPrinter.print(node);
-    for (PlanNode child : node.getChildren()) {
-      print(child);
-    }
-  }
-
   private static class Rewriter extends PlanVisitor<PlanNode, Context> {
     private final Analysis analysis;
 

Reply via email to