This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 24a00905ff2 Fix aggregation query of TableModel when there are 
numerous regions in one device
24a00905ff2 is described below

commit 24a00905ff23ef7e951e53236eba2cf98a7cfbfe
Author: Weihao Li <[email protected]>
AuthorDate: Sat Nov 23 11:37:06 2024 +0800

    Fix aggregation query of TableModel when there are numerous regions in one 
device
---
 .../plan/relational/analyzer/Analysis.java         |   3 +-
 .../distribute/TableDistributedPlanGenerator.java  |  23 +++--
 .../plan/relational/analyzer/AggregationTest.java  | 114 ++++++++++++++++-----
 .../plan/relational/analyzer/TestMatadata.java     |  30 ++++++
 4 files changed, 133 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index 51351f8bea5..ec0cc83f61a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -101,6 +101,7 @@ import static java.util.Collections.unmodifiableList;
 import static java.util.Collections.unmodifiableMap;
 import static java.util.Collections.unmodifiableSet;
 import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
 
 public class Analysis implements IAnalysis {
 
@@ -820,7 +821,7 @@ public class Analysis implements IAnalysis {
   public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
       String database, IDeviceID deviceId, Filter timeFilter) {
     if (dataPartition == null) {
-      return Collections.singletonList(new TRegionReplicaSet());
+      return Collections.singletonList(NOT_ASSIGNED);
     } else {
       return dataPartition.getDataRegionReplicaSetWithTimeFilter(database, 
deviceId, timeFilter);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 05a36435310..c7a1c8475ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -79,6 +79,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
+import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.GROUP_KEY_SUFFIX;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.SEPARATOR;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE;
@@ -514,9 +515,8 @@ public class TableDistributedPlanGenerator
 
   @Override
   public List<PlanNode> visitAggregation(AggregationNode node, PlanContext 
context) {
-    OrderingScheme expectedOrderingSchema = null;
     if (node.isStreamable()) {
-      expectedOrderingSchema = 
constructOrderingSchema(node.getPreGroupedSymbols());
+      OrderingScheme expectedOrderingSchema = 
constructOrderingSchema(node.getPreGroupedSymbols());
       context.setExpectedOrderingScheme(expectedOrderingSchema);
     }
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
@@ -563,7 +563,6 @@ public class TableDistributedPlanGenerator
   @Override
   public List<PlanNode> visitAggregationTableScan(
       AggregationTableScanNode node, PlanContext context) {
-
     boolean needSplit = false;
     List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
     for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
@@ -579,20 +578,28 @@ public class TableDistributedPlanGenerator
     }
 
     if (regionReplicaSetsList.isEmpty()) {
-      regionReplicaSetsList =
-          Collections.singletonList(Collections.singletonList(new 
TRegionReplicaSet()));
+      regionReplicaSetsList = 
Collections.singletonList(Collections.singletonList(NOT_ASSIGNED));
     }
 
     Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap = new 
HashMap<>();
-    // Step is SINGLE, has date_bin(time) and device data in more than one 
region, we need to split
-    // this node into two-stage Aggregation
-    needSplit = needSplit && node.getProjection() != null && node.getStep() == 
SINGLE;
+    // Step is SINGLE and device data in more than one region, we need to 
final aggregate the result
+    // from different region here, so split
+    // this node into two-stage
+    needSplit = needSplit && node.getStep() == SINGLE;
     AggregationNode finalAggregation = null;
     if (needSplit) {
       Pair<AggregationNode, AggregationTableScanNode> splitResult =
           split(node, symbolAllocator, queryId);
       finalAggregation = splitResult.left;
       AggregationTableScanNode partialAggregation = splitResult.right;
+
+      // cover case: complete push-down + group by + streamable
+      if (!context.hasSortProperty && finalAggregation.isStreamable()) {
+        OrderingScheme expectedOrderingSchema =
+            constructOrderingSchema(node.getPreGroupedSymbols());
+        context.setExpectedOrderingScheme(expectedOrderingSchema);
+      }
+
       buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, 
partialAggregation);
     } else {
       buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, node);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
index de711140a50..6d99582aac9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
@@ -488,7 +488,8 @@ public class AggregationTest {
     // All ID columns appear in GroupingKeys
     // Output - Project - AggTableScan
     LogicalQueryPlan logicalQueryPlan =
-        planTester.createPlan("SELECT count(s2) FROM table1 group by tag1, 
tag2, tag3");
+        planTester.createPlan(
+            "SELECT count(s2) FROM table1 where tag1!='shenzhen' group by 
tag1, tag2, tag3");
     assertPlan(
         logicalQueryPlan,
         output(
@@ -507,7 +508,6 @@ public class AggregationTest {
         planTester.getFragmentPlan(0),
         output(
             collect(
-                exchange(),
                 project(
                     aggregationTableScan(
                         singleGroupingSet("tag1", "tag2", "tag3"),
@@ -532,19 +532,6 @@ public class AggregationTest {
                 ImmutableList.of("tag1", "tag2", "tag3", "count"),
                 ImmutableSet.of("tag1", "tag2", "tag3", "s2"))));
 
-    // Project - AggTableScan
-    assertPlan(
-        planTester.getFragmentPlan(2),
-        project(
-            aggregationTableScan(
-                singleGroupingSet("tag1", "tag2", "tag3"),
-                ImmutableList.of("tag1", "tag2", "tag3"), // Streamable
-                Optional.empty(),
-                SINGLE,
-                "testdb.table1",
-                ImmutableList.of("tag1", "tag2", "tag3", "count"),
-                ImmutableSet.of("tag1", "tag2", "tag3", "s2"))));
-
     // All ID columns appear in GroupingKeys, and Attribute columns , time or 
date_bin(time) also
     // appear
     logicalQueryPlan =
@@ -562,7 +549,7 @@ public class AggregationTest {
                     "testdb.table1",
                     ImmutableList.of("tag1", "tag2", "tag3", "attr1", 
"date_bin$gid", "count"),
                     ImmutableSet.of("tag1", "tag2", "tag3", "attr1", "time", 
"s2")))));
-    // Output - Project - Agg(FINAL) - Collect - AggTableScan
+    // Output - Project - Agg(FINAL) - MergeSort - AggTableScan
     assertPlan(
         planTester.getFragmentPlan(0),
         output(
@@ -575,7 +562,7 @@ public class AggregationTest {
                     ImmutableList.of("tag1", "tag2", "tag3", "attr1"), // 
Streamable
                     Optional.empty(),
                     FINAL,
-                    collect(
+                    mergeSort(
                         exchange(),
                         aggregationTableScan(
                             singleGroupingSet("tag1", "tag2", "tag3", "attr1", 
"date_bin$gid"),
@@ -599,17 +586,6 @@ public class AggregationTest {
             ImmutableList.of("tag1", "tag2", "tag3", "attr1", "date_bin$gid", 
"count_0"),
             ImmutableSet.of("tag1", "tag2", "tag3", "attr1", "time", "s2")));
 
-    assertPlan(
-        planTester.getFragmentPlan(2),
-        aggregationTableScan(
-            singleGroupingSet("tag1", "tag2", "tag3", "attr1", "date_bin$gid"),
-            ImmutableList.of("tag1", "tag2", "tag3", "attr1"), // Streamable
-            Optional.empty(),
-            PARTIAL,
-            "testdb.table1",
-            ImmutableList.of("tag1", "tag2", "tag3", "attr1", "date_bin$gid", 
"count_0"),
-            ImmutableSet.of("tag1", "tag2", "tag3", "attr1", "time", "s2")));
-
     // Global Aggregation or partialPushDown Aggregation with only one 
deviceEntry
 
     // Output - AggTableScan
@@ -696,4 +672,86 @@ public class AggregationTest {
                         "tag1", "tag2", "tag3", "first", "last", "first_by", 
"last_by"),
                     ImmutableSet.of("tag1", "tag2", "tag3", "s1", "time")))));
   }
+
+  @Test
+  public void deviceWithNumerousRegionTest() {
+    PlanTester planTester = new PlanTester();
+    LogicalQueryPlan logicalQueryPlan =
+        planTester.createPlan("SELECT count(s1) FROM table1 where tag2='B2'");
+    // complete push-down when do logical optimize
+    assertPlan(
+        logicalQueryPlan,
+        output(
+            aggregationTableScan(
+                singleGroupingSet(),
+                ImmutableList.of(),
+                Optional.empty(),
+                SINGLE,
+                "testdb.table1",
+                ImmutableList.of("count"),
+                ImmutableSet.of("s1"))));
+
+    // transform to two-stage when generate distribution plan
+    assertPlan(
+        planTester.getFragmentPlan(0),
+        output(
+            aggregation(
+                singleGroupingSet(),
+                ImmutableMap.of(
+                    Optional.of("count"),
+                    aggregationFunction("count", ImmutableList.of("count_0"))),
+                Optional.empty(),
+                FINAL,
+                collect(
+                    aggregationTableScan(
+                        singleGroupingSet(),
+                        ImmutableList.of(),
+                        Optional.empty(),
+                        PARTIAL,
+                        "testdb.table1",
+                        ImmutableList.of("count_0"),
+                        ImmutableSet.of("s1")),
+                    exchange()))));
+
+    logicalQueryPlan =
+        planTester.createPlan(
+            "SELECT count(s1) FROM table1 where tag2='B2' group by tag1, tag2, 
tag3");
+    // complete push-down when do logical optimize
+    assertPlan(
+        logicalQueryPlan,
+        output(
+            project(
+                aggregationTableScan(
+                    singleGroupingSet("tag1", "tag2", "tag3"),
+                    ImmutableList.of("tag1", "tag2", "tag3"),
+                    Optional.empty(),
+                    SINGLE,
+                    "testdb.table1",
+                    ImmutableList.of("tag1", "tag2", "tag3", "count"),
+                    ImmutableSet.of("tag1", "tag2", "tag3", "s1")))));
+
+    // transform to two-stage when generate distribution plan
+    assertPlan(
+        planTester.getFragmentPlan(0),
+        output(
+            project(
+                aggregation(
+                    singleGroupingSet("tag1", "tag2", "tag3"),
+                    ImmutableMap.of(
+                        Optional.of("count"),
+                        aggregationFunction("count", 
ImmutableList.of("count_0"))),
+                    ImmutableList.of("tag1", "tag2", "tag3"),
+                    Optional.empty(),
+                    FINAL,
+                    mergeSort(
+                        aggregationTableScan(
+                            singleGroupingSet("tag1", "tag2", "tag3"),
+                            ImmutableList.of("tag1", "tag2", "tag3"),
+                            Optional.empty(),
+                            PARTIAL,
+                            "testdb.table1",
+                            ImmutableList.of("tag1", "tag2", "tag3", 
"count_0"),
+                            ImmutableSet.of("tag1", "tag2", "tag3", "s1")),
+                        exchange())))));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
index 5e6fc8bca6e..a1a7b9ef363 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
@@ -240,6 +240,17 @@ public class TestMatadata implements Metadata {
             new DeviceEntry(new StringArrayDeviceID(DEVICE_6.split("\\.")), 
DEVICE_6_ATTRIBUTES),
             new DeviceEntry(new StringArrayDeviceID(DEVICE_5.split("\\.")), 
DEVICE_5_ATTRIBUTES));
       }
+      if (compareNotEqualsMatch(expressionList.get(0), "tag1", "shenzhen")) {
+        return Arrays.asList(
+            new DeviceEntry(new StringArrayDeviceID(DEVICE_4.split("\\.")), 
DEVICE_4_ATTRIBUTES),
+            new DeviceEntry(new StringArrayDeviceID(DEVICE_1.split("\\.")), 
DEVICE_1_ATTRIBUTES),
+            new DeviceEntry(new StringArrayDeviceID(DEVICE_3.split("\\.")), 
DEVICE_3_ATTRIBUTES),
+            new DeviceEntry(new StringArrayDeviceID(DEVICE_2.split("\\.")), 
DEVICE_2_ATTRIBUTES));
+      }
+      if (compareEqualsMatch(expressionList.get(0), "tag2", "B2")) {
+        return Collections.singletonList(
+            new DeviceEntry(new StringArrayDeviceID(DEVICE_5.split("\\.")), 
DEVICE_5_ATTRIBUTES));
+      }
     }
 
     return Arrays.asList(
@@ -270,6 +281,25 @@ public class TestMatadata implements Metadata {
     return false;
   }
 
+  private boolean compareNotEqualsMatch(Expression expression, String 
idOrAttr, String value) {
+    if (expression instanceof ComparisonExpression
+        && ((ComparisonExpression) expression).getOperator()
+            == ComparisonExpression.Operator.NOT_EQUAL) {
+      Expression leftExpression = ((ComparisonExpression) 
expression).getLeft();
+      Expression rightExpression = ((ComparisonExpression) 
expression).getRight();
+      if (leftExpression instanceof SymbolReference && rightExpression 
instanceof StringLiteral) {
+        return ((SymbolReference) 
leftExpression).getName().equalsIgnoreCase(idOrAttr)
+            && ((StringLiteral) 
rightExpression).getValue().equalsIgnoreCase(value);
+      } else if (leftExpression instanceof StringLiteral
+          && rightExpression instanceof SymbolReference) {
+        return ((SymbolReference) 
rightExpression).getName().equalsIgnoreCase(idOrAttr)
+            && ((StringLiteral) 
leftExpression).getValue().equalsIgnoreCase(value);
+      }
+    }
+
+    return false;
+  }
+
   @Override
   public Optional<TableSchema> validateTableHeaderSchema(
       String database, TableSchema tableSchema, MPPQueryContext context, 
boolean allowCreateTable) {

Reply via email to