This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch builtin-udtf
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e4bc4ee6f3fbac099bf95d2e2dd831b0ae575247
Author: Chen YZ <[email protected]>
AuthorDate: Fri Feb 28 16:08:00 2025 +0800

    save
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  3 +-
 .../distribute/TableDistributedPlanGenerator.java  | 55 +++++++++++++++++++++-
 .../TransformAggregationToStreamable.java          | 28 +++++------
 3 files changed, 70 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8fa597a9c6f..41706fbd323 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -370,7 +370,8 @@ public class IoTDBConfig {
   /** How many threads can concurrently execute query statement. When <= 0, 
use CPU core number. */
   private int queryThreadCount = Runtime.getRuntime().availableProcessors();
 
-  private int degreeOfParallelism = Math.max(1, 
Runtime.getRuntime().availableProcessors() / 2);
+  //  private int degreeOfParallelism = Math.max(1, 
Runtime.getRuntime().availableProcessors() / 2);
+  private int degreeOfParallelism = 1;
 
   private int mergeThresholdOfExplainAnalyze = 10;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 8ddc35912bd..d8037df5f12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -516,12 +516,65 @@ public class TableDistributedPlanGenerator
   public List<PlanNode> visitDeviceTableScan(
       final DeviceTableScanNode node, final PlanContext context) {
     if (context.isPushDownGrouping()) {
-      return constructDeviceTableScanByTags(node, context);
+      //      return constructDeviceTableScanByTags(node, context);
+      return constructDeviceTableScanTmp(node, context);
     } else {
       return constructDeviceTableScanByRegionReplicaSet(node, context);
     }
   }
 
+  private List<PlanNode> constructDeviceTableScanTmp(
+      final DeviceTableScanNode node, final PlanContext context) {
+    List<PlanNode> result = new ArrayList<>();
+    final Map<TRegionReplicaSet, Integer> regionDeviceCount = new HashMap<>();
+    for (final DeviceEntry deviceEntry : node.getDeviceEntries()) {
+      final List<TRegionReplicaSet> regionReplicaSets =
+          analysis.getDataRegionReplicaSetWithTimeFilter(
+              node.getQualifiedObjectName().getDatabaseName(),
+              deviceEntry.getDeviceID(),
+              node.getTimeFilter());
+      List<PlanNode> tmp = new ArrayList<>();
+      for (final TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+        regionDeviceCount.put(
+            regionReplicaSet, regionDeviceCount.getOrDefault(regionReplicaSet, 
0) + 1);
+        DeviceTableScanNode scanNode =
+            new DeviceTableScanNode(
+                queryId.genPlanNodeId(),
+                node.getQualifiedObjectName(),
+                node.getOutputSymbols(),
+                node.getAssignments(),
+                new ArrayList<>(),
+                node.getIdAndAttributeIndexMap(),
+                node.getScanOrder(),
+                node.getTimePredicate().orElse(null),
+                node.getPushDownPredicate(),
+                node.getPushDownLimit(),
+                node.getPushDownOffset(),
+                node.isPushLimitToEachDevice(),
+                node.containsNonAlignedDevice());
+        scanNode.setRegionReplicaSet(regionReplicaSet);
+        scanNode.appendDeviceEntry(deviceEntry);
+        tmp.add(scanNode);
+      }
+      if (context.hasSortProperty) {
+        processSortProperty(node, tmp, context);
+      }
+      if (tmp.size() == 1) {
+        result.add(tmp.get(0));
+      } else {
+        CollectNode collectNode =
+            new CollectNode(queryId.genPlanNodeId(), tmp, 
node.getOutputSymbols());
+        result.add(collectNode);
+      }
+    }
+    context.mostUsedRegion =
+        regionDeviceCount.entrySet().stream()
+            .max(Comparator.comparingInt(Map.Entry::getValue))
+            .map(Map.Entry::getKey)
+            .orElse(null);
+    return result;
+  }
+
   private List<PlanNode> constructDeviceTableScanByTags(
       final DeviceTableScanNode node, final PlanContext context) {
     List<PlanNode> result = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
index 6a7609e907f..60944e152d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
@@ -35,6 +36,7 @@ import com.google.common.collect.ImmutableSet;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -125,20 +127,18 @@ public class TransformAggregationToStreamable implements 
PlanOptimizer {
     @Override
     public List<Symbol> visitTableFunctionProcessor(
         TableFunctionProcessorNode node, GroupContext context) {
-
-      return ImmutableList.of();
-      //      if (node.getChildren().isEmpty()) {
-      //        return ImmutableList.of();
-      //      } else if (node.isRowSemantic()) {
-      //        return visitPlan(node, context);
-      //      }
-      //      Optional<DataOrganizationSpecification> 
dataOrganizationSpecification =
-      //          node.getDataOrganizationSpecification();
-      //      return dataOrganizationSpecification
-      //          .<List<Symbol>>map(
-      //              organizationSpecification ->
-      //                  
ImmutableList.copyOf(organizationSpecification.getPartitionBy()))
-      //          .orElseGet(ImmutableList::of);
+      if (node.getChildren().isEmpty()) {
+        return ImmutableList.of();
+      } else if (node.isRowSemantic()) {
+        return visitPlan(node, context);
+      }
+      Optional<DataOrganizationSpecification> dataOrganizationSpecification =
+          node.getDataOrganizationSpecification();
+      return dataOrganizationSpecification
+          .<List<Symbol>>map(
+              organizationSpecification ->
+                  
ImmutableList.copyOf(organizationSpecification.getPartitionBy()))
+          .orElseGet(ImmutableList::of);
     }
 
     @Override

Reply via email to