This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a5e7d7b30c9 Fix cases of AggregationTableScan used with Join
a5e7d7b30c9 is described below

commit a5e7d7b30c993c0657566f817e84f767656b4c23
Author: Weihao Li <[email protected]>
AuthorDate: Tue May 20 14:37:32 2025 +0800

    Fix cases of AggregationTableScan used with Join
---
 .../db/it/IoTDBMultiTAGsWithAttributesTableIT.java | 17 ++++++++++++++
 .../distribute/TableDistributedPlanGenerator.java  | 26 ++++++++++++++++------
 .../planner/node/AggregationTableScanNode.java     | 12 ++++++----
 .../plan/relational/analyzer/JoinTest.java         | 16 +++++++++++++
 4 files changed, 60 insertions(+), 11 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
index b0b70d5f0df..1e80e36e42d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
@@ -2779,6 +2779,23 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
         DATABASE_NAME);
   }
 
+  @Test
+  public void aggregationTableScanWithJoinTest() {
+    expectedHeader = new String[] {"date", "_col1", "date", "_col3"};
+    retArray = new String[] 
{"1970-01-01T00:00:00.000Z,2,1970-01-01T00:00:00.000Z,2,"};
+    // Join may rename the 'time' column, so we need to ensure the correctness 
of
+    // AggregationTableScan in this case
+    tableResultSetEqualTest(
+        "select * from ("
+            + "select date_bin(1ms,time) as date,count(*)from table0 group by 
date_bin(1ms,time)) t0 "
+            + "join ("
+            + "select date_bin(1ms,time) as date,count(*)from table1 where 
time=0 group by date_bin(1ms,time)) t1 "
+            + "on t0.date = t1.date",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
   public static void repeatTest(
       String sql, String[] expectedHeader, String[] retArray, String dbName, 
int repeatTimes) {
     for (int i = 0; i < repeatTimes; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index d99fa3da1e3..085073596dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -84,6 +84,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Table
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DataNodeLocationSupplierFactory;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils;
@@ -113,9 +115,6 @@ import java.util.stream.IntStream;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
-import static 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN;
-import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.GROUP_KEY_SUFFIX;
-import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.SEPARATOR;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.TransformSortToStreamSort.isOrderByAllIdsAndTime;
@@ -125,8 +124,6 @@ import static 
org.apache.tsfile.utils.Preconditions.checkArgument;
 /** This class is used to generate distributed plan for table model. */
 public class TableDistributedPlanGenerator
     extends PlanVisitor<List<PlanNode>, 
TableDistributedPlanGenerator.PlanContext> {
-  private static final String PUSH_DOWN_DATE_BIN_SYMBOL_NAME =
-      DATE_BIN.getFunctionName() + SEPARATOR + GROUP_KEY_SUFFIX;
   private final QueryId queryId;
   private final Analysis analysis;
   private final SymbolAllocator symbolAllocator;
@@ -1432,8 +1429,23 @@ public class TableDistributedPlanGenerator
 
   // time column or push down date_bin function call in agg which should only 
have one such column
   private boolean timeRelatedSymbol(Symbol symbol, DeviceTableScanNode 
deviceTableScanNode) {
-    return deviceTableScanNode.isTimeColumn(symbol)
-        || PUSH_DOWN_DATE_BIN_SYMBOL_NAME.equals(symbol.getName());
+    if (deviceTableScanNode.isTimeColumn(symbol)) {
+      return true;
+    }
+
+    if (deviceTableScanNode instanceof AggregationTableScanNode) {
+      AggregationTableScanNode aggregationTableScanNode =
+          (AggregationTableScanNode) deviceTableScanNode;
+      if (aggregationTableScanNode.getProjection() != null
+          && !aggregationTableScanNode.getProjection().getMap().isEmpty()) {
+        Expression expression = 
aggregationTableScanNode.getProjection().get(symbol);
+        // For now, if there is FunctionCall in AggregationTableScanNode, it 
must be date_bin
+        // function of time. See 
PushAggregationIntoTableScan#isDateBinFunctionOfTime
+        return expression instanceof FunctionCall;
+      }
+    }
+
+    return false;
   }
 
   // ------------------- schema related interface 
---------------------------------------------
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
index f88437662c0..e669a98c9fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
@@ -165,8 +165,9 @@ public class AggregationTableScanNode extends 
DeviceTableScanNode {
       Symbol symbol = entry.getKey();
       AggregationNode.Aggregation aggregation = entry.getValue();
       if (aggregation.getArguments().isEmpty()) {
-        AggregationNode.Aggregation countStarAggregation = 
getCountStarAggregation(aggregation);
-        if (!getTimeColumn(assignments).isPresent()) {
+        AggregationNode.Aggregation countStarAggregation;
+        Optional<Symbol> timeSymbol = getTimeColumn(assignments);
+        if (!timeSymbol.isPresent()) {
           assignments.put(
               Symbol.of(TABLE_TIME_COLUMN_NAME),
               new ColumnSchema(
@@ -174,6 +175,9 @@ public class AggregationTableScanNode extends 
DeviceTableScanNode {
                   TimestampType.TIMESTAMP,
                   false,
                   TsTableColumnCategory.TIME));
+          countStarAggregation = getCountStarAggregation(aggregation, 
TABLE_TIME_COLUMN_NAME);
+        } else {
+          countStarAggregation = getCountStarAggregation(aggregation, 
timeSymbol.get().getName());
         }
         resultBuilder.put(symbol, countStarAggregation);
       } else {
@@ -185,7 +189,7 @@ public class AggregationTableScanNode extends 
DeviceTableScanNode {
   }
 
   private static AggregationNode.Aggregation getCountStarAggregation(
-      AggregationNode.Aggregation aggregation) {
+      AggregationNode.Aggregation aggregation, String timeSymbolName) {
     ResolvedFunction resolvedFunction = aggregation.getResolvedFunction();
     ResolvedFunction countStarFunction =
         new ResolvedFunction(
@@ -197,7 +201,7 @@ public class AggregationTableScanNode extends 
DeviceTableScanNode {
             resolvedFunction.getFunctionNullability());
     return new AggregationNode.Aggregation(
         countStarFunction,
-        Collections.singletonList(new SymbolReference(TABLE_TIME_COLUMN_NAME)),
+        Collections.singletonList(new SymbolReference(timeSymbolName)),
         aggregation.isDistinct(),
         aggregation.getFilter(),
         aggregation.getOrderingScheme(),
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
index 42436d1b477..2b0806c857c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
@@ -596,6 +596,22 @@ public class JoinTest {
                     
builder.left(sort(tableScan1)).right(sort(tableScan2)).ignoreEquiCriteria())));
   }
 
+  @Test
+  public void aggregationTableScanWithJoinTest() {
+    PlanTester planTester = new PlanTester();
+    sql =
+        "select * from ("
+            + "select date_bin(1ms,time) as date,count(*)from table1 where 
tag1='Beijing' and tag2='A1' group by date_bin(1ms,time)) t0 "
+            + "join ("
+            + "select date_bin(1ms,time) as date,count(*)from table1 where 
tag1='Beijing' and tag2='A1' group by date_bin(1ms,time)) t1 "
+            + "on t0.date = t1.date";
+    logicalQueryPlan = planTester.createPlan(sql);
+    // the sort node has been eliminated
+    assertPlan(planTester.getFragmentPlan(1), aggregationTableScan());
+    // the sort node has been eliminated
+    assertPlan(planTester.getFragmentPlan(2), aggregationTableScan());
+  }
+
   @Ignore
   @Test
   public void otherInnerJoinTests() {

Reply via email to