This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a5e7d7b30c9 Fix cases of AggregationTableScan used with Join
a5e7d7b30c9 is described below
commit a5e7d7b30c993c0657566f817e84f767656b4c23
Author: Weihao Li <[email protected]>
AuthorDate: Tue May 20 14:37:32 2025 +0800
Fix cases of AggregationTableScan used with Join
---
.../db/it/IoTDBMultiTAGsWithAttributesTableIT.java | 17 ++++++++++++++
.../distribute/TableDistributedPlanGenerator.java | 26 ++++++++++++++++------
.../planner/node/AggregationTableScanNode.java | 12 ++++++----
.../plan/relational/analyzer/JoinTest.java | 16 +++++++++++++
4 files changed, 60 insertions(+), 11 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
index b0b70d5f0df..1e80e36e42d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
@@ -2779,6 +2779,23 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
DATABASE_NAME);
}
+ @Test
+ public void aggregationTableScanWithJoinTest() {
+ expectedHeader = new String[] {"date", "_col1", "date", "_col3"};
+ retArray = new String[]
{"1970-01-01T00:00:00.000Z,2,1970-01-01T00:00:00.000Z,2,"};
+ // Join may rename the 'time' column, so we need to ensure the correctness
of
+ // AggregationTableScan in this case
+ tableResultSetEqualTest(
+ "select * from ("
+ + "select date_bin(1ms,time) as date,count(*)from table0 group by
date_bin(1ms,time)) t0 "
+ + "join ("
+ + "select date_bin(1ms,time) as date,count(*)from table1 where
time=0 group by date_bin(1ms,time)) t1 "
+ + "on t0.date = t1.date",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
public static void repeatTest(
String sql, String[] expectedHeader, String[] retArray, String dbName,
int repeatTimes) {
for (int i = 0; i < repeatTimes; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index d99fa3da1e3..085073596dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -84,6 +84,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Table
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DataNodeLocationSupplierFactory;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils;
@@ -113,9 +115,6 @@ import java.util.stream.IntStream;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
-import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.GROUP_KEY_SUFFIX;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.SEPARATOR;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.TransformSortToStreamSort.isOrderByAllIdsAndTime;
@@ -125,8 +124,6 @@ import static
org.apache.tsfile.utils.Preconditions.checkArgument;
/** This class is used to generate distributed plan for table model. */
public class TableDistributedPlanGenerator
extends PlanVisitor<List<PlanNode>,
TableDistributedPlanGenerator.PlanContext> {
- private static final String PUSH_DOWN_DATE_BIN_SYMBOL_NAME =
- DATE_BIN.getFunctionName() + SEPARATOR + GROUP_KEY_SUFFIX;
private final QueryId queryId;
private final Analysis analysis;
private final SymbolAllocator symbolAllocator;
@@ -1432,8 +1429,23 @@ public class TableDistributedPlanGenerator
// time column or push down date_bin function call in agg which should only
have one such column
private boolean timeRelatedSymbol(Symbol symbol, DeviceTableScanNode
deviceTableScanNode) {
- return deviceTableScanNode.isTimeColumn(symbol)
- || PUSH_DOWN_DATE_BIN_SYMBOL_NAME.equals(symbol.getName());
+ if (deviceTableScanNode.isTimeColumn(symbol)) {
+ return true;
+ }
+
+ if (deviceTableScanNode instanceof AggregationTableScanNode) {
+ AggregationTableScanNode aggregationTableScanNode =
+ (AggregationTableScanNode) deviceTableScanNode;
+ if (aggregationTableScanNode.getProjection() != null
+ && !aggregationTableScanNode.getProjection().getMap().isEmpty()) {
+ Expression expression =
aggregationTableScanNode.getProjection().get(symbol);
+ // For now, if there is FunctionCall in AggregationTableScanNode, it
must be date_bin
+ // function of time. See
PushAggregationIntoTableScan#isDateBinFunctionOfTime
+ return expression instanceof FunctionCall;
+ }
+ }
+
+ return false;
}
// ------------------- schema related interface
---------------------------------------------
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
index f88437662c0..e669a98c9fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
@@ -165,8 +165,9 @@ public class AggregationTableScanNode extends
DeviceTableScanNode {
Symbol symbol = entry.getKey();
AggregationNode.Aggregation aggregation = entry.getValue();
if (aggregation.getArguments().isEmpty()) {
- AggregationNode.Aggregation countStarAggregation =
getCountStarAggregation(aggregation);
- if (!getTimeColumn(assignments).isPresent()) {
+ AggregationNode.Aggregation countStarAggregation;
+ Optional<Symbol> timeSymbol = getTimeColumn(assignments);
+ if (!timeSymbol.isPresent()) {
assignments.put(
Symbol.of(TABLE_TIME_COLUMN_NAME),
new ColumnSchema(
@@ -174,6 +175,9 @@ public class AggregationTableScanNode extends
DeviceTableScanNode {
TimestampType.TIMESTAMP,
false,
TsTableColumnCategory.TIME));
+ countStarAggregation = getCountStarAggregation(aggregation,
TABLE_TIME_COLUMN_NAME);
+ } else {
+ countStarAggregation = getCountStarAggregation(aggregation,
timeSymbol.get().getName());
}
resultBuilder.put(symbol, countStarAggregation);
} else {
@@ -185,7 +189,7 @@ public class AggregationTableScanNode extends
DeviceTableScanNode {
}
private static AggregationNode.Aggregation getCountStarAggregation(
- AggregationNode.Aggregation aggregation) {
+ AggregationNode.Aggregation aggregation, String timeSymbolName) {
ResolvedFunction resolvedFunction = aggregation.getResolvedFunction();
ResolvedFunction countStarFunction =
new ResolvedFunction(
@@ -197,7 +201,7 @@ public class AggregationTableScanNode extends
DeviceTableScanNode {
resolvedFunction.getFunctionNullability());
return new AggregationNode.Aggregation(
countStarFunction,
- Collections.singletonList(new SymbolReference(TABLE_TIME_COLUMN_NAME)),
+ Collections.singletonList(new SymbolReference(timeSymbolName)),
aggregation.isDistinct(),
aggregation.getFilter(),
aggregation.getOrderingScheme(),
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
index 42436d1b477..2b0806c857c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
@@ -596,6 +596,22 @@ public class JoinTest {
builder.left(sort(tableScan1)).right(sort(tableScan2)).ignoreEquiCriteria())));
}
+ @Test
+ public void aggregationTableScanWithJoinTest() {
+ PlanTester planTester = new PlanTester();
+ sql =
+ "select * from ("
+ + "select date_bin(1ms,time) as date,count(*)from table1 where
tag1='Beijing' and tag2='A1' group by date_bin(1ms,time)) t0 "
+ + "join ("
+ + "select date_bin(1ms,time) as date,count(*)from table1 where
tag1='Beijing' and tag2='A1' group by date_bin(1ms,time)) t1 "
+ + "on t0.date = t1.date";
+ logicalQueryPlan = planTester.createPlan(sql);
+ // the sort node has been eliminated
+ assertPlan(planTester.getFragmentPlan(1), aggregationTableScan());
+ // the sort node has been eliminated
+ assertPlan(planTester.getFragmentPlan(2), aggregationTableScan());
+ }
+
@Ignore
@Test
public void otherInnerJoinTests() {