This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 72b528cf533 [IOTDB-6253] Use template to accelerate front-end query
performance
72b528cf533 is described below
commit 72b528cf533f8896ad3c275617f1466cc2a49bf6
Author: Beyyes <[email protected]>
AuthorDate: Tue Nov 21 17:08:11 2023 +0800
[IOTDB-6253] Use template to accelerate front-end query performance
---
.../IoTDBAlignByDeviceWithTemplateIT.java | 584 +++++++++++++++++++++
.../operator/source/AlignedSeriesScanOperator.java | 8 +-
.../operator/source/AlignedSeriesScanUtil.java | 13 +-
.../iotdb/db/queryengine/plan/Coordinator.java | 1 -
.../db/queryengine/plan/analyze/Analysis.java | 88 +++-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 46 +-
.../plan/analyze/ExpressionTypeAnalyzer.java | 11 +
.../queryengine/plan/analyze/TemplatedAnalyze.java | 403 ++++++++++++++
.../db/queryengine/plan/analyze/TypeProvider.java | 65 ++-
.../queryengine/plan/execution/QueryExecution.java | 1 +
.../plan/planner/LogicalPlanBuilder.java | 21 +-
.../plan/planner/LogicalPlanVisitor.java | 8 +-
.../plan/planner/OperatorTreeGenerator.java | 72 ++-
.../plan/planner/SubPlanTypeExtractor.java | 16 +-
.../plan/planner/TemplatedLogicalPlan.java | 195 +++++++
.../plan/planner/TemplatedLogicalPlanBuilder.java | 145 +++++
.../planner/distribution/ExchangeNodeAdder.java | 3 -
.../plan/planner/distribution/SourceRewriter.java | 24 +-
.../schemaregion/mtree/traverser/Traverser.java | 1 +
.../operator/AlignedSeriesScanOperatorTest.java | 15 +-
.../execution/operator/OperatorMemoryTest.java | 3 +-
.../distribution/DistributionPlannerCycleTest.java | 9 +-
.../queryengine/plan/plan/distribution/Util.java | 2 +-
.../queryengine/plan/plan/distribution/Util2.java | 16 +-
24 files changed, 1667 insertions(+), 83 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
new file mode 100644
index 00000000000..9ad4ab87e3f
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+
+public class IoTDBAlignByDeviceWithTemplateIT {
+ private static final String[] sqls =
+ new String[] {
+ // non-aligned template
+ "CREATE database root.sg1;",
+ "CREATE schema template t1 (s1 FLOAT encoding=RLE, s2 BOOLEAN
encoding=PLAIN compression=SNAPPY, s3 INT32);",
+ "SET SCHEMA TEMPLATE t1 to root.sg1;",
+ "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1,1.1,false,1),
(2,2.2,false,2);",
+ "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1,11.1,false,11),
(2,22.2,false,22);",
+ "INSERT INTO root.sg1.d3(timestamp,s1,s2,s3)
values(1,111.1,true,null), (4,444.4,true,44);",
+ "INSERT INTO root.sg1.d4(timestamp,s1,s2,s3)
values(1,1111.1,true,1111), (5,5555.5,false,5555);",
+
+ // aligned template
+ "CREATE database root.sg2;",
+ "CREATE schema template t2 aligned (s1 FLOAT encoding=RLE, s2 BOOLEAN
encoding=PLAIN compression=SNAPPY, s3 INT32);",
+ "SET SCHEMA TEMPLATE t2 to root.sg2;",
+ "INSERT INTO root.sg2.d1(timestamp,s1,s2,s3) values(1,1.1,false,1),
(2,2.2,false,2);",
+ "INSERT INTO root.sg2.d2(timestamp,s1,s2,s3) values(1,11.1,false,11),
(2,22.2,false,22);",
+ "INSERT INTO root.sg2.d3(timestamp,s1,s2,s3)
values(1,111.1,true,null), (4,444.4,true,44);",
+ "INSERT INTO root.sg2.d4(timestamp,s1,s2,s3)
values(1,1111.1,true,1111), (5,5555.5,false,5555);",
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void selectWildcardNoFilterTest() {
+ // 1. original
+ String[] expectedHeader = new String[] {"Time,Device,s3,s1,s2"};
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,1,1.1,false,",
+ "2,root.sg1.d1,2,2.2,false,",
+ "1,root.sg1.d2,11,11.1,false,",
+ "2,root.sg1.d2,22,22.2,false,",
+ "1,root.sg1.d3,null,111.1,true,",
+ "4,root.sg1.d3,44,444.4,true,",
+ "1,root.sg1.d4,1111,1111.1,true,",
+ "5,root.sg1.d4,5555,5555.5,false,",
+ };
+ resultSetEqualTest("SELECT * FROM root.sg1.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+ retArray =
+ new String[] {
+ "1,root.sg2.d1,1,1.1,false,",
+ "2,root.sg2.d1,2,2.2,false,",
+ "1,root.sg2.d2,11,11.1,false,",
+ "2,root.sg2.d2,22,22.2,false,",
+ "1,root.sg2.d3,null,111.1,true,",
+ "4,root.sg2.d3,44,444.4,true,",
+ "1,root.sg2.d4,1111,1111.1,true,",
+ "5,root.sg2.d4,5555,5555.5,false,",
+ };
+ resultSetEqualTest("SELECT * FROM root.sg2.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+
+ expectedHeader = new String[] {"Time,Device,s3,s1,s2,s1"};
+ retArray =
+ new String[] {
+ "1,root.sg1.d1,1,1.1,false,1.1,",
+ "2,root.sg1.d1,2,2.2,false,2.2,",
+ "1,root.sg1.d2,11,11.1,false,11.1,",
+ "2,root.sg1.d2,22,22.2,false,22.2,",
+ "1,root.sg1.d3,null,111.1,true,111.1,",
+ "4,root.sg1.d3,44,444.4,true,444.4,",
+ "1,root.sg1.d4,1111,1111.1,true,1111.1,",
+ "5,root.sg1.d4,5555,5555.5,false,5555.5,",
+ };
+ resultSetEqualTest("SELECT *, s1 FROM root.sg1.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+ retArray =
+ new String[] {
+ "1,root.sg2.d1,1,1.1,false,1.1,",
+ "2,root.sg2.d1,2,2.2,false,2.2,",
+ "1,root.sg2.d2,11,11.1,false,11.1,",
+ "2,root.sg2.d2,22,22.2,false,22.2,",
+ "1,root.sg2.d3,null,111.1,true,111.1,",
+ "4,root.sg2.d3,44,444.4,true,444.4,",
+ "1,root.sg2.d4,1111,1111.1,true,1111.1,",
+ "5,root.sg2.d4,5555,5555.5,false,5555.5,",
+ };
+ resultSetEqualTest("SELECT *, s1 FROM root.sg2.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+
+ expectedHeader = new String[] {"Time,Device,s3,s1,s2"};
+ retArray =
+ new String[] {
+ "1,root.sg1.d1,1,1.1,false,",
+ "2,root.sg1.d1,2,2.2,false,",
+ "1,root.sg1.d2,11,11.1,false,",
+ "2,root.sg1.d2,22,22.2,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg1.d1,root.sg1.d2,root.sg1.d6 ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+ retArray =
+ new String[] {
+ "1,root.sg2.d1,1,1.1,false,",
+ "2,root.sg2.d1,2,2.2,false,",
+ "1,root.sg2.d2,11,11.1,false,",
+ "2,root.sg2.d2,22,22.2,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg2.d1,root.sg2.d2,root.sg2.d6 ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ // 2. limit + offset
+ expectedHeader = new String[] {"Time,Device,s3,s1,s2"};
+ retArray =
+ new String[] {
+ "2,root.sg1.d1,2,2.2,false,", "1,root.sg1.d2,11,11.1,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg1.** OFFSET 1 LIMIT 2 ALIGN BY DEVICE;",
expectedHeader, retArray);
+
+ retArray =
+ new String[] {
+ "2,root.sg2.d1,2,2.2,false,", "1,root.sg2.d2,11,11.1,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg2.** OFFSET 1 LIMIT 2 ALIGN BY DEVICE;",
expectedHeader, retArray);
+
+ // 3. order by time + limit
+ retArray =
+ new String[] {
+ "5,root.sg1.d4,5555,5555.5,false,", "4,root.sg1.d3,44,444.4,true,",
+ "2,root.sg1.d1,2,2.2,false,", "2,root.sg1.d2,22,22.2,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg1.** ORDER BY TIME DESC LIMIT 4 ALIGN BY
DEVICE;",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+ "5,root.sg2.d4,5555,5555.5,false,", "4,root.sg2.d3,44,444.4,true,",
+ "2,root.sg2.d1,2,2.2,false,", "2,root.sg2.d2,22,22.2,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg2.** ORDER BY TIME DESC LIMIT 4 ALIGN BY
DEVICE;",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void selectMeasurementTestNoFilterTest() {
+ // 1. original
+ String[] expectedHeader = new String[] {"Time,Device,s3,s1"};
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,1,1.1,",
+ "2,root.sg1.d1,2,2.2,",
+ "1,root.sg1.d2,11,11.1,",
+ "2,root.sg1.d2,22,22.2,",
+ "1,root.sg1.d3,null,111.1,",
+ "4,root.sg1.d3,44,444.4,",
+ "1,root.sg1.d4,1111,1111.1,",
+ "5,root.sg1.d4,5555,5555.5,",
+ };
+ resultSetEqualTest("SELECT s3,s1 FROM root.sg1.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+ resultSetEqualTest(
+ "SELECT s3,s1,s_null FROM root.sg1.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+
+ retArray =
+ new String[] {
+ "1,root.sg2.d1,1,1.1,",
+ "2,root.sg2.d1,2,2.2,",
+ "1,root.sg2.d2,11,11.1,",
+ "2,root.sg2.d2,22,22.2,",
+ "1,root.sg2.d3,null,111.1,",
+ "4,root.sg2.d3,44,444.4,",
+ "1,root.sg2.d4,1111,1111.1,",
+ "5,root.sg2.d4,5555,5555.5,",
+ };
+ resultSetEqualTest("SELECT s3,s1 FROM root.sg2.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+ resultSetEqualTest(
+ "SELECT s3,s1,s_null FROM root.sg2.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+
+ // 2. limit + offset
+ retArray =
+ new String[] {
+ "2,root.sg1.d1,2,2.2,", "1,root.sg1.d2,11,11.1,",
+ };
+ resultSetEqualTest(
+ "SELECT s3,s1 FROM root.sg1.** OFFSET 1 LIMIT 2 ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+ "2,root.sg2.d1,2,2.2,", "1,root.sg2.d2,11,11.1,",
+ };
+ resultSetEqualTest(
+ "SELECT s3,s1 FROM root.sg2.** OFFSET 1 LIMIT 2 ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ // 3. order by time + limit
+ retArray =
+ new String[] {
+ "5,root.sg1.d4,5555,5555.5,", "4,root.sg1.d3,44,444.4,",
+ "2,root.sg1.d1,2,2.2,", "2,root.sg1.d2,22,22.2,",
+ };
+ resultSetEqualTest(
+ "SELECT s3,s1 FROM root.sg1.** ORDER BY TIME DESC LIMIT 4 ALIGN BY
DEVICE;",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+ "5,root.sg2.d4,5555,5555.5,", "4,root.sg2.d3,44,444.4,",
+ "2,root.sg2.d1,2,2.2,", "2,root.sg2.d2,22,22.2,",
+ };
+ resultSetEqualTest(
+ "SELECT s3,s1 FROM root.sg2.** ORDER BY TIME DESC LIMIT 4 ALIGN BY
DEVICE;",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void selectWildcardWithFilterTest() {
+ // 1. order by time + time filter
+ String[] expectedHeader = new String[] {"Time,Device,s3,s1,s2"};
+ String[] retArray =
+ new String[] {
+ "4,root.sg1.d3,44,444.4,true,",
+ "2,root.sg1.d1,2,2.2,false,",
+ "2,root.sg1.d2,22,22.2,false,",
+ "1,root.sg1.d1,1,1.1,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg1.** WHERE time < 5 ORDER BY TIME DESC LIMIT 4
ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+ "4,root.sg2.d3,44,444.4,true,",
+ "2,root.sg2.d1,2,2.2,false,",
+ "2,root.sg2.d2,22,22.2,false,",
+ "1,root.sg2.d1,1,1.1,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg2.** WHERE time < 5 ORDER BY TIME DESC LIMIT 4
ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ // 2. order by time + time filter + value filter
+ retArray =
+ new String[] {
+ "4,root.sg1.d3,44,444.4,true,", "2,root.sg1.d2,22,22.2,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg1.** where time > 1 and time < 5 and s3>=11 and
s3<=1111 and s1 != 11.1 "
+ + "ORDER BY TIME DESC ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+ "4,root.sg2.d3,44,444.4,true,", "2,root.sg2.d2,22,22.2,false,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg2.** where time > 1 and time < 5 and s3>=11 and
s3<=1111 and s1 != 11.1 "
+ + "ORDER BY TIME DESC ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ // 3. order by time + value filter: s_null > 1
+ retArray = new String[] {};
+ resultSetEqualTest(
+ "SELECT * FROM root.sg1.** WHERE s_null > 1 ALIGN BY DEVICE;",
expectedHeader, retArray);
+ resultSetEqualTest(
+ "SELECT * FROM root.sg2.** WHERE s_null > 1 ALIGN BY DEVICE;",
expectedHeader, retArray);
+ }
+
+ @Test
+ public void selectMeasurementWithFilterTest() {
+ // 1. order by time + time filter
+ String[] expectedHeader = new String[] {"Time,Device,s3,s2"};
+ String[] retArray =
+ new String[] {
+ "4,root.sg1.d3,44,true,",
+ "2,root.sg1.d1,2,false,",
+ "2,root.sg1.d2,22,false,",
+ "1,root.sg1.d1,1,false,",
+ };
+ resultSetEqualTest(
+ "SELECT s3,s2 FROM root.sg1.** WHERE time < 5 ORDER BY TIME DESC LIMIT
4 ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+ "4,root.sg2.d3,44,true,",
+ "2,root.sg2.d1,2,false,",
+ "2,root.sg2.d2,22,false,",
+ "1,root.sg2.d1,1,false,",
+ };
+ resultSetEqualTest(
+ "SELECT s3,s2 FROM root.sg2.** WHERE time < 5 ORDER BY TIME DESC LIMIT
4 ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ // 2. order by time + time filter + value filter
+ retArray =
+ new String[] {
+ "4,root.sg1.d3,44,true,", "2,root.sg1.d2,22,false,",
+ };
+ resultSetEqualTest(
+ "SELECT s3,s2 FROM root.sg1.** where time > 1 and time < 5 and s3>=11
and s3<=1111 and s1 != 11.1 "
+ + "ORDER BY TIME DESC ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+ "4,root.sg2.d3,44,true,", "2,root.sg2.d2,22,false,",
+ };
+ resultSetEqualTest(
+ "SELECT s3,s2 FROM root.sg2.** where time > 1 and time < 5 and s3>=11
and s3<=1111 and s1 != 11.1 "
+ + "ORDER BY TIME DESC ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ // 3. order by time + value filter: s_null > 1
+ retArray = new String[] {};
+ resultSetEqualTest(
+ "SELECT s3,s2 FROM root.sg1.** WHERE s_null > 1 ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "SELECT s3,s2 FROM root.sg2.** WHERE s_null > 1 ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void aliasTest() {
+ String[] expectedHeader = new String[] {"Time,Device,aa,bb,s3,s2"};
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,1.1,false,1,false,",
+ "2,root.sg1.d1,2.2,false,2,false,",
+ "1,root.sg1.d2,11.1,false,11,false,",
+ "2,root.sg1.d2,22.2,false,22,false,",
+ "1,root.sg1.d3,111.1,true,null,true,",
+ "4,root.sg1.d3,444.4,true,44,true,",
+ "1,root.sg1.d4,1111.1,true,1111,true,",
+ "5,root.sg1.d4,5555.5,false,5555,false,",
+ };
+ resultSetEqualTest(
+ "SELECT s1 as aa, s2 as bb, s3, s2 FROM root.sg1.** ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ expectedHeader = new String[] {"Time,Device,aa,bb,s3,s2"};
+ retArray =
+ new String[] {
+ "1,root.sg2.d1,1.1,false,1,false,",
+ "2,root.sg2.d1,2.2,false,2,false,",
+ "1,root.sg2.d2,11.1,false,11,false,",
+ "2,root.sg2.d2,22.2,false,22,false,",
+ "1,root.sg2.d3,111.1,true,null,true,",
+ "4,root.sg2.d3,444.4,true,44,true,",
+ "1,root.sg2.d4,1111.1,true,1111,true,",
+ "5,root.sg2.d4,5555.5,false,5555,false,",
+ };
+ resultSetEqualTest(
+ "SELECT s1 as aa, s2 as bb, s3, s2 FROM root.sg2.** ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ expectedHeader = new String[] {"Time,Device,a,b"};
+ retArray =
+ new String[] {
+ "1,root.sg1.d1,1.1,1.1,",
+ "2,root.sg1.d1,2.2,2.2,",
+ "1,root.sg1.d2,11.1,11.1,",
+ "2,root.sg1.d2,22.2,22.2,",
+ "1,root.sg1.d3,111.1,111.1,",
+ "4,root.sg1.d3,444.4,444.4,",
+ "1,root.sg1.d4,1111.1,1111.1,",
+ "5,root.sg1.d4,5555.5,5555.5,",
+ };
+ resultSetEqualTest(
+ "SELECT s1 as a, s1 as b FROM root.sg1.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+
+ expectedHeader = new String[] {"Time,Device,a,b"};
+ retArray =
+ new String[] {
+ "1,root.sg2.d1,1.1,1.1,",
+ "2,root.sg2.d1,2.2,2.2,",
+ "1,root.sg2.d2,11.1,11.1,",
+ "2,root.sg2.d2,22.2,22.2,",
+ "1,root.sg2.d3,111.1,111.1,",
+ "4,root.sg2.d3,444.4,444.4,",
+ "1,root.sg2.d4,1111.1,1111.1,",
+ "5,root.sg2.d4,5555.5,5555.5,",
+ };
+ resultSetEqualTest(
+ "SELECT s1 as a, s1 as b FROM root.sg2.** ALIGN BY DEVICE;",
expectedHeader, retArray);
+ }
+
+ @Test
+ public void orderByExpressionTest() {
+ // order by expression is not supported temporarily
+ // 1. order by basic measurement
+ String[] expectedHeader = new String[] {"Time,Device,s3,s1,s2"};
+ String[] retArray =
+ new String[] {
+ "5,root.sg1.d4,5555,5555.5,false,",
+ "2,root.sg1.d2,22,22.2,false,",
+ "1,root.sg1.d2,11,11.1,false,",
+ "2,root.sg1.d1,2,2.2,false,",
+ "1,root.sg1.d1,1,1.1,false,",
+ "1,root.sg1.d4,1111,1111.1,true,",
+ "4,root.sg1.d3,44,444.4,true,",
+ "1,root.sg1.d3,null,111.1,true,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg1.** order by s2 asc, s1 desc ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+ "5,root.sg2.d4,5555,5555.5,false,",
+ "2,root.sg2.d2,22,22.2,false,",
+ "1,root.sg2.d2,11,11.1,false,",
+ "2,root.sg2.d1,2,2.2,false,",
+ "1,root.sg2.d1,1,1.1,false,",
+ "1,root.sg2.d4,1111,1111.1,true,",
+ "4,root.sg2.d3,44,444.4,true,",
+ "1,root.sg2.d3,null,111.1,true,",
+ };
+ resultSetEqualTest(
+ "SELECT * FROM root.sg2.** order by s2 asc, s1 desc ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ // 2. select measurement is different with order by measurement
+ expectedHeader = new String[] {"Time,Device,s3"};
+ retArray =
+ new String[] {
+ "5,root.sg1.d4,5555,",
+ "2,root.sg1.d2,22,",
+ "1,root.sg1.d2,11,",
+ "2,root.sg1.d1,2,",
+ "1,root.sg1.d1,1,",
+ "1,root.sg1.d4,1111,",
+ "4,root.sg1.d3,44,",
+ "1,root.sg1.d3,null,",
+ };
+ resultSetEqualTest(
+ "SELECT s3 FROM root.sg1.** order by s2 asc, s1 desc ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ retArray =
+ new String[] {
+ "5,root.sg2.d4,5555,",
+ "2,root.sg2.d2,22,",
+ "1,root.sg2.d2,11,",
+ "2,root.sg2.d1,2,",
+ "1,root.sg2.d1,1,",
+ "1,root.sg2.d4,1111,",
+ "4,root.sg2.d3,44,",
+ "1,root.sg2.d3,null,",
+ };
+ resultSetEqualTest(
+ "SELECT s3 FROM root.sg2.** order by s2 asc, s1 desc ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ // 3. order by expression
+ retArray =
+ new String[] {
+ "5,root.sg1.d4,5555,",
+ "1,root.sg1.d4,1111,",
+ "4,root.sg1.d3,44,",
+ "2,root.sg1.d2,22,",
+ "1,root.sg1.d2,11,",
+ "2,root.sg1.d1,2,",
+ "1,root.sg1.d1,1,",
+ "1,root.sg1.d3,null,",
+ };
+ resultSetEqualTest(
+ "SELECT s3 FROM root.sg1.** order by s1+s3 desc ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+ retArray =
+ new String[] {
+ "5,root.sg2.d4,5555,",
+ "1,root.sg2.d4,1111,",
+ "4,root.sg2.d3,44,",
+ "2,root.sg2.d2,22,",
+ "1,root.sg2.d2,11,",
+ "2,root.sg2.d1,2,",
+ "1,root.sg2.d1,1,",
+ "1,root.sg2.d3,null,",
+ };
+ resultSetEqualTest(
+ "SELECT s3 FROM root.sg2.** order by s1+s3 desc ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void templateInvalidTest() {
+ // 1. non align by device query
+ String[] expectedHeader = new String[]
{"Time,root.sg1.d4.s3,root.sg1.d4.s1,root.sg1.d4.s2"};
+ String[] retArray =
+ new String[] {
+ "1,1111,1111.1,true,", "5,5555,5555.5,false,",
+ };
+ resultSetEqualTest("SELECT * FROM root.sg1.** slimit 3;", expectedHeader,
retArray);
+
+ expectedHeader = new String[]
{"Time,root.sg2.d4.s3,root.sg2.d4.s1,root.sg2.d4.s2"};
+ retArray =
+ new String[] {
+ "1,1111,1111.1,true,", "5,5555,5555.5,false,",
+ };
+ resultSetEqualTest("SELECT * FROM root.sg2.** slimit 3;", expectedHeader,
retArray);
+
+ // 2. aggregation
+ expectedHeader = new String[] {"Device,count(s1 + 1)"};
+ retArray =
+ new String[] {
+ "root.sg1.d1,2,", "root.sg1.d2,2,", "root.sg1.d3,2,",
"root.sg1.d4,2,",
+ };
+ resultSetEqualTest(
+ "select count(s1+1) from root.sg1.** align by device;",
expectedHeader, retArray);
+ }
+
+ private static void insertData() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : sqls) {
+ statement.execute(sql);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
index c80bed6963d..47357a7d39e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -33,6 +34,7 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder.MAX_LINE_NUMBER;
@@ -49,7 +51,8 @@ public class AlignedSeriesScanOperator extends
AbstractDataSourceOperator {
AlignedPath seriesPath,
Ordering scanOrder,
SeriesScanOptions seriesScanOptions,
- boolean queryAllSensors) {
+ boolean queryAllSensors,
+ List<TSDataType> dataTypes) {
this.sourceId = sourceId;
this.operatorContext = context;
this.seriesScanUtil =
@@ -58,7 +61,8 @@ public class AlignedSeriesScanOperator extends
AbstractDataSourceOperator {
scanOrder,
seriesScanOptions,
context.getInstanceContext(),
- queryAllSensors);
+ queryAllSensors,
+ dataTypes);
// time + all value columns
this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
this.valueColumnCount = seriesPath.getColumnNum();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
index 300412081f4..9adbd6ffd33 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
@@ -60,7 +60,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
Ordering scanOrder,
SeriesScanOptions scanOptions,
FragmentInstanceContext context) {
- this(seriesPath, scanOrder, scanOptions, context, false);
+ this(seriesPath, scanOrder, scanOptions, context, false, null);
}
public AlignedSeriesScanUtil(
@@ -68,11 +68,16 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
Ordering scanOrder,
SeriesScanOptions scanOptions,
FragmentInstanceContext context,
- boolean queryAllSensors) {
+ boolean queryAllSensors,
+ List<TSDataType> givenDataTypes) {
super(seriesPath, scanOrder, scanOptions, context);
dataTypes =
- ((AlignedPath) seriesPath)
-
.getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
+ givenDataTypes != null
+ ? givenDataTypes
+ : ((AlignedPath) seriesPath)
+ .getSchemaList().stream()
+ .map(IMeasurementSchema::getType)
+ .collect(Collectors.toList());
isAligned = true;
this.queryAllSensors = queryAllSensors;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index cf4eb213752..1487a0f9932 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -158,7 +158,6 @@ public class Coordinator {
queryContext.setTimeOut(Long.MAX_VALUE);
}
execution.start();
-
return execution.getStatus();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index e9a541a9d79..ac563fb5494 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.FillDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter;
@@ -47,6 +48,7 @@ import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import java.util.ArrayList;
import java.util.HashMap;
@@ -131,9 +133,6 @@ public class Analysis {
// the list of device names
private List<PartialPath> deviceList;
- // map from output device name to queried devices
- private Map<String, List<String>> outputDeviceToQueriedDevicesMap;
-
// map from device name to series/aggregation under this device
private Map<String, Set<Expression>> deviceToSourceExpressions;
@@ -164,7 +163,10 @@ public class Analysis {
private Set<Expression> deviceViewOutputExpressions;
- private final Map<String, Set<Expression>> deviceToOutputExpressions = new
HashMap<>();
+ private Map<String, Set<Expression>> deviceToOutputExpressions = new
HashMap<>();
+
+ // map from output device name to queried devices
+ private Map<String, String> outputDeviceToQueriedDevicesMap;
// indicates whether DeviceView need special process when rewriteSource in
DistributionPlan,
// you can see SourceRewriter#visitDeviceView to get more information
@@ -271,6 +273,21 @@ public class Analysis {
// if `order by limit N align by device` query use topK optimization
private boolean useTopKNode = false;
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // All Queries Devices Set In One Template
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+ // if all devices are set in one template in align by device query, this
variable will not be null
+ private Template deviceTemplate;
+ // when deviceTemplate is not empty and all expressions in this query are
templated measurements,
+ // i.e. no aggregation and arithmetic expression
+ private boolean onlyQueryTemplateMeasurements = true;
+ // if it is wildcard query
+ private boolean templateWildCardQuery;
+ // all queried measurementList and schemaList in deviceTemplate.
+ private List<String> measurementList;
+ private List<IMeasurementSchema> measurementSchemaList;
+
public Analysis() {
this.finishQueryAfterAnalyze = false;
}
@@ -351,6 +368,13 @@ public class Analysis {
if (expression.getExpressionType() == ExpressionType.NULL) {
return null;
}
+
+ if (isAllDevicesInOneTemplate()
+ && (isOnlyQueryTemplateMeasurements() || expression instanceof
TimeSeriesOperand)) {
+ TimeSeriesOperand seriesOperand = (TimeSeriesOperand) expression;
+ return
deviceTemplate.getSchemaMap().get(seriesOperand.getPath().getMeasurement()).getType();
+ }
+
TSDataType type = expressionTypes.get(NodeRef.of(expression));
checkArgument(type != null, "Expression is not analyzed: %s", expression);
return type;
@@ -751,12 +775,12 @@ public class Analysis {
this.lastQueryNonWritableViewSourceExpressionMap =
lastQueryNonWritableViewSourceExpressionMap;
}
- public Map<String, List<String>> getOutputDeviceToQueriedDevicesMap() {
+ public Map<String, String> getOutputDeviceToQueriedDevicesMap() {
return outputDeviceToQueriedDevicesMap;
}
public void setOutputDeviceToQueriedDevicesMap(
- Map<String, List<String>> outputDeviceToQueriedDevicesMap) {
+ Map<String, String> outputDeviceToQueriedDevicesMap) {
this.outputDeviceToQueriedDevicesMap = outputDeviceToQueriedDevicesMap;
}
@@ -764,6 +788,10 @@ public class Analysis {
return deviceToOutputExpressions;
}
+ public void setDeviceToOutputExpressions(Map<String, Set<Expression>>
deviceToOutputExpressions) {
+ this.deviceToOutputExpressions = deviceToOutputExpressions;
+ }
+
public boolean isLastLevelUseWildcard() {
return lastLevelUseWildcard;
}
@@ -787,4 +815,52 @@ public class Analysis {
public List<PartialPath> getDeviceList() {
return deviceList;
}
+
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // All Queries Devices Set In One Template
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public boolean isAllDevicesInOneTemplate() {
+ return this.deviceTemplate != null;
+ }
+
+ public Template getDeviceTemplate() {
+ return this.deviceTemplate;
+ }
+
+ public void setDeviceTemplate(Template template) {
+ this.deviceTemplate = template;
+ }
+
+ public boolean isOnlyQueryTemplateMeasurements() {
+ return onlyQueryTemplateMeasurements;
+ }
+
+ public void setOnlyQueryTemplateMeasurements(boolean
onlyQueryTemplateMeasurements) {
+ this.onlyQueryTemplateMeasurements = onlyQueryTemplateMeasurements;
+ }
+
+ public List<String> getMeasurementList() {
+ return this.measurementList;
+ }
+
+ public void setMeasurementList(List<String> measurementList) {
+ this.measurementList = measurementList;
+ }
+
+ public List<IMeasurementSchema> getMeasurementSchemaList() {
+ return this.measurementSchemaList;
+ }
+
+ public void setMeasurementSchemaList(List<IMeasurementSchema>
measurementSchemaList) {
+ this.measurementSchemaList = measurementSchemaList;
+ }
+
+ public void setTemplateWildCardQuery() {
+ this.templateWildCardQuery = true;
+ }
+
+ public boolean isTemplateWildCardQuery() {
+ return this.templateWildCardQuery;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index bd468967cf4..d8993125300 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -202,12 +202,12 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
private static final Logger logger =
LoggerFactory.getLogger(AnalyzeVisitor.class);
- private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
- private static final Expression DEVICE_EXPRESSION =
+ static final Expression DEVICE_EXPRESSION =
TimeSeriesOperand.constructColumnHeaderExpression(DEVICE,
TSDataType.TEXT);
- private static final Expression END_TIME_EXPRESSION =
+ static final Expression END_TIME_EXPRESSION =
TimeSeriesOperand.constructColumnHeaderExpression(ENDTIME,
TSDataType.INT64);
private final List<String> lastQueryColumnNames =
@@ -242,11 +242,16 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext
context) {
Analysis analysis = new Analysis();
analysis.setLastLevelUseWildcard(queryStatement.isLastLevelUseWildcard());
+
+ long startTime = System.currentTimeMillis();
try {
// check for semantic errors
queryStatement.semanticCheck();
ISchemaTree schemaTree = analyzeSchema(queryStatement, analysis,
context);
+
+ logger.warn("--- [analyzeSchema] : {}ms", System.currentTimeMillis() -
startTime);
+
// If there is no leaf node in the schema tree, the query should be
completed immediately
if (schemaTree.isEmpty()) {
return finishQuery(queryStatement, analysis);
@@ -261,6 +266,11 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
List<Pair<Expression, String>> outputExpressions;
if (queryStatement.isAlignByDevice()) {
+ if (TemplatedAnalyze.canBuildPlanUseTemplate(
+ analysis, queryStatement, partitionFetcher, schemaTree)) {
+ return analysis;
+ }
+
List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
@@ -269,7 +279,9 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
+
outputExpressions = analyzeSelect(analysis, queryStatement,
schemaTree, deviceList);
+
if (deviceList.isEmpty()) {
return finishQuery(queryStatement, analysis);
}
@@ -373,6 +385,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
QueryPlanCostMetricSet.getInstance()
.recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
}
+
analysis.setSchemaTree(schemaTree);
return schemaTree;
}
@@ -591,7 +604,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- List<PartialPath> deviceSet) {
+ List<PartialPath> deviceList) {
List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
Map<String, Set<Expression>> deviceToSelectExpressions = new HashMap<>();
ColumnPaginationController paginationController =
@@ -605,7 +618,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
// use LinkedHashMap for order-preserving
Map<Expression, Map<String, Expression>>
measurementToDeviceSelectExpressions =
new LinkedHashMap<>();
- for (PartialPath device : deviceSet) {
+ for (PartialPath device : deviceList) {
List<Expression> selectExpressionsOfOneDevice =
concatDeviceAndBindSchemaForExpression(selectExpression, device,
schemaTree);
if (selectExpressionsOfOneDevice.isEmpty()) {
@@ -660,12 +673,12 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
// remove devices without measurements to compute
Set<PartialPath> noMeasurementDevices = new HashSet<>();
- for (PartialPath device : deviceSet) {
+ for (PartialPath device : deviceList) {
if (!deviceToSelectExpressions.containsKey(device.getFullPath())) {
noMeasurementDevices.add(device);
}
}
- deviceSet.removeAll(noMeasurementDevices);
+ deviceList.removeAll(noMeasurementDevices);
// when the select expression of any device is empty,
// the where expression map also need remove this device
@@ -1224,7 +1237,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- Map<String, List<String>> outputDeviceToQueriedDevicesMap = new
LinkedHashMap<>();
+ Map<String, String> outputDeviceToQueriedDevicesMap = new
LinkedHashMap<>();
for (Map.Entry<String, Set<Expression>> entry :
deviceToSourceExpressions.entrySet()) {
String deviceName = entry.getKey();
Set<Expression> sourceExpressionsUnderDevice = entry.getValue();
@@ -1236,7 +1249,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
throw new SemanticException(
"Cross-device queries are not supported in ALIGN BY DEVICE
queries.");
}
- outputDeviceToQueriedDevicesMap.put(deviceName, new
ArrayList<>(queriedDevices));
+ outputDeviceToQueriedDevicesMap.put(deviceName,
queriedDevices.iterator().next());
}
analysis.setDeviceToSourceExpressions(deviceToSourceExpressions);
@@ -1258,7 +1271,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- private static final String WHERE_WRONG_TYPE_ERROR_MSG =
+ static final String WHERE_WRONG_TYPE_ERROR_MSG =
"The output type of the expression in WHERE clause should be BOOLEAN,
actual data type: %s.";
private void analyzeDeviceToWhere(
@@ -1359,7 +1372,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analyzeDeviceViewSpecialProcess(deviceViewOutputExpressions,
queryStatement, analysis));
}
- private boolean analyzeDeviceViewSpecialProcess(
+ static boolean analyzeDeviceViewSpecialProcess(
Set<Expression> deviceViewOutputExpressions,
QueryStatement queryStatement,
Analysis analysis) {
@@ -1431,7 +1444,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- private void analyzeOutput(
+ static void analyzeOutput(
Analysis analysis,
QueryStatement queryStatement,
List<Pair<Expression, String>> outputExpressions) {
@@ -1513,7 +1526,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
queryStatement.updateSortItems(orderByExpressions);
}
- private TSDataType analyzeExpressionType(Analysis analysis, Expression
expression) {
+ static TSDataType analyzeExpressionType(Analysis analysis, Expression
expression) {
return analyzeExpression(analysis, expression);
}
@@ -1775,7 +1788,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analysis.setGlobalTimeFilter(globalTimeFilter);
}
- private void analyzeFill(Analysis analysis, QueryStatement queryStatement) {
+ static void analyzeFill(Analysis analysis, QueryStatement queryStatement) {
if (queryStatement.getFillComponent() == null) {
return;
}
@@ -1792,10 +1805,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis, QueryStatement queryStatement, ISchemaTree
schemaTree) {
Set<String> deviceSet = new HashSet<>();
if (queryStatement.isAlignByDevice()) {
- deviceSet =
- analysis.getOutputDeviceToQueriedDevicesMap().values().stream()
- .flatMap(List::stream)
- .collect(Collectors.toSet());
+ deviceSet = new
HashSet<>(analysis.getOutputDeviceToQueriedDevicesMap().values());
} else {
for (Expression expression : analysis.getSourceExpressions()) {
deviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
index 41cd1fbe882..09b3966b079 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
@@ -59,6 +59,17 @@ public class ExpressionTypeAnalyzer {
private ExpressionTypeAnalyzer() {}
public static TSDataType analyzeExpression(Analysis analysis, Expression
expression) {
+ if (analysis.isAllDevicesInOneTemplate()
+ && (analysis.isOnlyQueryTemplateMeasurements()
+ || expression instanceof TimeSeriesOperand)) {
+ TimeSeriesOperand seriesOperand = (TimeSeriesOperand) expression;
+ return analysis
+ .getDeviceTemplate()
+ .getSchemaMap()
+ .get(seriesOperand.getPath().getMeasurement())
+ .getType();
+ }
+
if (!analysis.getExpressionTypes().containsKey(NodeRef.of(expression))) {
ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
analyzer.analyze(expression);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
new file mode 100644
index 00000000000..0c7c868319c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze;
+
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn;
+import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.CONFIG;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.WHERE_WRONG_TYPE_ERROR_MSG;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeDeviceViewSpecialProcess;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeExpressionType;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeFill;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.normalizeExpression;
+
+/**
+ * This class provides accelerated implementation for multiple devices align
by device query. This
+ * optimization is only used for devices with same template, using template
can avoid many
+ * unnecessary judgements.
+ *
+ * <p>e.g. for query `SELECT * FROM root.xx.** order by device/time/expression
align by device`, the
+ * device list of `root.xx.**` must use same template.
+ */
+public class TemplatedAnalyze {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TemplatedAnalyze.class);
+
+ private TemplatedAnalyze() {}
+
+ /**
+ * examine that if all devices are in same template, if true, use the
TemplatedAnalyze,
+ * TemplatedLogicalPlan, TemplatedLogicalPlanBuilder to optimize it.
+ */
+ public static boolean canBuildPlanUseTemplate(
+ Analysis analysis,
+ QueryStatement queryStatement,
+ IPartitionFetcher partitionFetcher,
+ ISchemaTree schemaTree) {
+ if (queryStatement.isAggregationQuery()
+ || queryStatement.isGroupBy()
+ || queryStatement.isGroupByTime()
+ || queryStatement.isSelectInto()
+ || queryStatement.hasFill()
+ || schemaTree.hasNormalTimeSeries()) {
+ return false;
+ }
+
+ List<Template> templates = schemaTree.getUsingTemplates();
+ if (templates.size() != 1) {
+ return false;
+ }
+
+ Template template = templates.get(0);
+
+ List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
+ if (template != null) {
+ for (ResultColumn resultColumn :
queryStatement.getSelectComponent().getResultColumns()) {
+ Expression expression = resultColumn.getExpression();
+ if ("*".equals(expression.getOutputSymbol())) {
+ for (Map.Entry<String, IMeasurementSchema> entry :
template.getSchemaMap().entrySet()) {
+ String measurementName = entry.getKey();
+ IMeasurementSchema measurementSchema = entry.getValue();
+ TimeSeriesOperand measurementPath =
+ new TimeSeriesOperand(
+ new MeasurementPath(new String[] {measurementName},
measurementSchema));
+ outputExpressions.add(new Pair<>(measurementPath, null));
+ }
+ if (queryStatement.getSelectComponent().getResultColumns().size() ==
1) {
+ analysis.setTemplateWildCardQuery();
+ }
+ } else if (expression instanceof TimeSeriesOperand) {
+ String measurementName = ((TimeSeriesOperand)
expression).getPath().getMeasurement();
+ if (template.getSchemaMap().containsKey(measurementName)) {
+ IMeasurementSchema measurementSchema =
template.getSchemaMap().get(measurementName);
+ TimeSeriesOperand measurementPath =
+ new TimeSeriesOperand(
+ new MeasurementPath(new String[] {measurementName},
measurementSchema));
+ outputExpressions.add(new Pair<>(measurementPath,
resultColumn.getAlias()));
+ }
+ } else {
+ return false;
+ }
+ }
+ }
+
+ if (queryStatement.hasOrderByExpression()) {
+ return false;
+ }
+
+ analyzeSelect(queryStatement, analysis, outputExpressions, template);
+
+ List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
+
+ analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
+
+ if (deviceList.isEmpty()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ return false;
+ }
+ analysis.setDeviceList(deviceList);
+
+ analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList);
+ analyzeDeviceToSourceTransform(analysis);
+ analyzeDeviceToSource(analysis);
+
+ analyzeDeviceViewOutput(analysis, queryStatement);
+ analyzeDeviceViewInput(analysis);
+
+ analyzeFill(analysis, queryStatement);
+
+ // generate result set header according to output expressions
+ analyzeOutput(analysis, queryStatement, outputExpressions);
+
+ // fetch partition information
+ analyzeDataPartition(analysis, schemaTree, partitionFetcher);
+ return true;
+ }
+
+ private static void analyzeSelect(
+ QueryStatement queryStatement,
+ Analysis analysis,
+ List<Pair<Expression, String>> outputExpressions,
+ Template template) {
+ List<String> measurementList = new ArrayList<>();
+ List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
+ LinkedHashSet<Expression> selectExpressions = new LinkedHashSet<>();
+ selectExpressions.add(DEVICE_EXPRESSION);
+ if (queryStatement.isOutputEndTime()) {
+ selectExpressions.add(END_TIME_EXPRESSION);
+ }
+ for (Pair<Expression, String> pair : outputExpressions) {
+ if (!selectExpressions.contains(pair.left)) {
+ selectExpressions.add(pair.left);
+ String measurementName = ((TimeSeriesOperand)
pair.getLeft()).getPath().getMeasurement();
+ measurementList.add(measurementName);
+ measurementSchemaList.add(template.getSchema(measurementName));
+ }
+ }
+ analysis.setOutputExpressions(outputExpressions);
+ analysis.setSelectExpressions(selectExpressions);
+ analysis.setDeviceTemplate(template);
+ analysis.setMeasurementList(measurementList);
+ analysis.setMeasurementSchemaList(measurementSchemaList);
+ }
+
+ private static List<PartialPath> analyzeFrom(
+ QueryStatement queryStatement, ISchemaTree schemaTree) {
+ // device path patterns in FROM clause
+ List<PartialPath> devicePatternList =
queryStatement.getFromComponent().getPrefixPaths();
+
+ Set<PartialPath> deviceSet = new HashSet<>();
+ for (PartialPath devicePattern : devicePatternList) {
+ deviceSet.addAll(
+ schemaTree.getMatchedDevices(devicePattern).stream()
+ .map(DeviceSchemaInfo::getDevicePath)
+ .collect(Collectors.toList()));
+ }
+
+ return queryStatement.getResultDeviceOrder() == Ordering.ASC
+ ? deviceSet.stream().sorted().collect(Collectors.toList())
+ :
deviceSet.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
+ }
+
+ private static void analyzeDeviceToWhere(
+ Analysis analysis,
+ QueryStatement queryStatement,
+ ISchemaTree schemaTree,
+ List<PartialPath> deviceList) {
+ if (!queryStatement.hasWhere()) {
+ return;
+ }
+
+ analysis.setOnlyQueryTemplateMeasurements(false);
+ Map<String, Expression> deviceToWhereExpression = new HashMap<>();
+ Iterator<PartialPath> deviceIterator = deviceList.iterator();
+ while (deviceIterator.hasNext()) {
+ PartialPath devicePath = deviceIterator.next();
+ Expression whereExpression;
+ try {
+ // can move this judgement to TemplatedLogicalPlan?
+ whereExpression =
+ normalizeExpression(analyzeWhereSplitByDevice(queryStatement,
devicePath, schemaTree));
+ } catch (MeasurementNotExistException e) {
+ LOGGER.warn(
+ "Meets MeasurementNotExistException in analyzeDeviceToWhere "
+ + "when executing align by device, error msg: {}",
+ e.getMessage());
+ deviceIterator.remove();
+ continue;
+ }
+
+ TSDataType outputType = analyzeExpressionType(analysis, whereExpression);
+ if (outputType != TSDataType.BOOLEAN) {
+ throw new SemanticException(String.format(WHERE_WRONG_TYPE_ERROR_MSG,
outputType));
+ }
+
+ deviceToWhereExpression.put(devicePath.getFullPath(), whereExpression);
+ }
+ analysis.setDeviceToWhereExpression(deviceToWhereExpression);
+ }
+
+ private static Expression analyzeWhereSplitByDevice(
+ QueryStatement queryStatement, PartialPath devicePath, ISchemaTree
schemaTree) {
+ List<Expression> conJunctions =
+ ExpressionAnalyzer.concatDeviceAndBindSchemaForPredicate(
+ queryStatement.getWhereCondition().getPredicate(), devicePath,
schemaTree, true);
+ return ExpressionUtils.constructQueryFilter(
+ conJunctions.stream().distinct().collect(Collectors.toList()));
+ }
+
+ private static void analyzeDeviceToOrderBy(
+ Analysis analysis,
+ QueryStatement queryStatement,
+ ISchemaTree schemaTree,
+ List<PartialPath> deviceSet) {
+ if (!queryStatement.hasOrderByExpression()) {
+ return;
+ }
+
+ Map<String, Set<Expression>> deviceToOrderByExpressions = new
LinkedHashMap<>();
+ Map<String, List<SortItem>> deviceToSortItems = new LinkedHashMap<>();
+ // build the device-view outputColumn for the sortNode above the
deviceViewNode
+ Set<Expression> deviceViewOrderByExpression = new LinkedHashSet<>();
+ for (PartialPath device : deviceSet) {
+ Set<Expression> orderByExpressionsForOneDevice = new LinkedHashSet<>();
+ for (Expression expressionForItem :
queryStatement.getExpressionSortItemList()) {
+ List<Expression> expressions =
+ concatDeviceAndBindSchemaForExpression(expressionForItem, device,
schemaTree);
+ if (expressions.isEmpty()) {
+ throw new SemanticException(
+ String.format(
+ "%s in order by clause doesn't exist.",
expressionForItem.getExpressionString()));
+ }
+ if (expressions.size() > 1) {
+ throw new SemanticException(
+ String.format(
+ "%s in order by clause shouldn't refer to more than one
timeseries.",
+ expressionForItem.getExpressionString()));
+ }
+ expressionForItem = expressions.get(0);
+ TSDataType dataType = analyzeExpressionType(analysis,
expressionForItem);
+ if (!dataType.isComparable()) {
+ throw new SemanticException(
+ String.format("The data type of %s is not comparable",
dataType));
+ }
+
+ Expression deviceViewExpression =
getMeasurementExpression(expressionForItem, analysis);
+ analyzeExpressionType(analysis, deviceViewExpression);
+
+ deviceViewOrderByExpression.add(deviceViewExpression);
+ orderByExpressionsForOneDevice.add(expressionForItem);
+ }
+ deviceToSortItems.put(
+ device.getFullPath(),
queryStatement.getUpdatedSortItems(orderByExpressionsForOneDevice));
+ deviceToOrderByExpressions.put(device.getFullPath(),
orderByExpressionsForOneDevice);
+ }
+
+ analysis.setOrderByExpressions(deviceViewOrderByExpression);
+ queryStatement.updateSortItems(deviceViewOrderByExpression);
+ analysis.setDeviceToSortItems(deviceToSortItems);
+ analysis.setDeviceToOrderByExpressions(deviceToOrderByExpressions);
+ }
+
+ private static void analyzeDeviceToSourceTransform(Analysis analysis) {
+
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
+ }
+
+ private static void analyzeDeviceViewOutput(Analysis analysis,
QueryStatement queryStatement) {
+ Set<Expression> selectExpressions = analysis.getSelectExpressions();
+ // TODO if no order by, just set deviceViewOutputExpressions as
selectExpressions
+ Set<Expression> deviceViewOutputExpressions = new
LinkedHashSet<>(selectExpressions);
+ if (queryStatement.hasOrderByExpression()) {
+ deviceViewOutputExpressions.addAll(analysis.getOrderByExpressions());
+ }
+ analysis.setDeviceViewOutputExpressions(deviceViewOutputExpressions);
+ analysis.setDeviceViewSpecialProcess(
+ analyzeDeviceViewSpecialProcess(deviceViewOutputExpressions,
queryStatement, analysis));
+ }
+
+ private static void analyzeDeviceViewInput(Analysis analysis) {
+ List<Integer> indexes = new ArrayList<>();
+
+ // index-0 is `Device`
+ for (int i = 1; i < analysis.getSelectExpressions().size(); i++) {
+ indexes.add(i);
+ }
+ Map<String, List<Integer>> deviceViewInputIndexesMap = new HashMap<>();
+ for (PartialPath devicePath : analysis.getDeviceList()) {
+ deviceViewInputIndexesMap.put(devicePath.getFullPath(), indexes);
+ }
+ analysis.setDeviceViewInputIndexesMap(deviceViewInputIndexesMap);
+ }
+
+ private static void analyzeDeviceToSource(Analysis analysis) {
+
analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
+
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
+ }
+
+ private static void analyzeDataPartition(
+ Analysis analysis, ISchemaTree schemaTree, IPartitionFetcher
partitionFetcher) {
+ // TemplatedDevice has no views, so there is no need to use
outputDeviceToQueriedDevicesMap
+ Set<String> deviceSet =
+
analysis.getDeviceList().stream().map(PartialPath::getFullPath).collect(Collectors.toSet());
+ DataPartition dataPartition =
+ fetchDataPartitionByDevices(
+ deviceSet, schemaTree, analysis.getGlobalTimeFilter(),
partitionFetcher);
+ analysis.setDataPartitionInfo(dataPartition);
+ }
+
+ private static DataPartition fetchDataPartitionByDevices(
+ Set<String> deviceSet,
+ ISchemaTree schemaTree,
+ Filter globalTimeFilter,
+ IPartitionFetcher partitionFetcher) {
+ long startTime = System.nanoTime();
+ try {
+ Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
+ getTimePartitionSlotList(globalTimeFilter);
+ // there is no satisfied time range
+ if (res.left.isEmpty() && Boolean.FALSE.equals(res.right.left)) {
+ return new DataPartition(
+ Collections.emptyMap(),
+ CONFIG.getSeriesPartitionExecutorClass(),
+ CONFIG.getSeriesPartitionSlotNum());
+ }
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new
HashMap<>();
+ for (String devicePath : deviceSet) {
+ DataPartitionQueryParam queryParam =
+ new DataPartitionQueryParam(devicePath, res.left, res.right.left,
res.right.right);
+ sgNameToQueryParamsMap
+ .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key
-> new ArrayList<>())
+ .add(queryParam);
+ }
+
+ if (res.right.left || res.right.right) {
+ return
partitionFetcher.getDataPartitionWithUnclosedTimeRange(sgNameToQueryParamsMap);
+ } else {
+ return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ }
+ } finally {
+ QueryPlanCostMetricSet.getInstance()
+ .recordPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
index e408bfa9f1b..625f1e9d770 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
@@ -21,18 +21,29 @@ package org.apache.iotdb.db.queryengine.plan.analyze;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
public class TypeProvider {
private final Map<String, TSDataType> typeMap;
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // All Queries Devices Set In One Template
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ private List<String> measurementList;
+ private List<IMeasurementSchema> schemaList;
+ private List<TSDataType> dataTypes;
+ private Set<String> allSensors;
+
public TypeProvider() {
this.typeMap = new HashMap<>();
}
@@ -41,6 +52,20 @@ public class TypeProvider {
this.typeMap = typeMap;
}
+ public TypeProvider(
+ List<String> measurementList,
+ List<IMeasurementSchema> schemaList,
+ List<TSDataType> dataTypes,
+ Set<String> allSensors) {
+ if (measurementList != null) {
+ this.measurementList = measurementList;
+ this.schemaList = schemaList;
+ this.dataTypes = dataTypes;
+ this.allSensors = allSensors;
+ }
+ this.typeMap = new HashMap<>();
+ }
+
public TSDataType getType(String symbol) {
return typeMap.get(symbol);
}
@@ -52,10 +77,6 @@ public class TypeProvider {
}
}
- public boolean containsTypeInfoOf(String path) {
- return typeMap.containsKey(path);
- }
-
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(typeMap.size(), byteBuffer);
for (Map.Entry<String, TSDataType> entry : typeMap.entrySet()) {
@@ -100,4 +121,40 @@ public class TypeProvider {
public int hashCode() {
return Objects.hash(typeMap);
}
+
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // All Queries Devices Set In One Template
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public void setMeasurementList(List<String> measurementList) {
+ this.measurementList = measurementList;
+ }
+
+ public List<String> getMeasurementList() {
+ return this.measurementList;
+ }
+
+ public void setSchemaList(List<IMeasurementSchema> schemaList) {
+ this.schemaList = schemaList;
+ }
+
+ public List<IMeasurementSchema> getSchemaList() {
+ return this.schemaList;
+ }
+
+ public void setDataTypes(List<TSDataType> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public List<TSDataType> getDataTypes() {
+ return this.dataTypes;
+ }
+
+ public void setAllSensors(Set<String> allSensors) {
+ this.allSensors = allSensors;
+ }
+
+ public Set<String> getAllSensors() {
+ return this.allSensors;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index a002f99cb29..ce33e169b36 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -221,6 +221,7 @@ public class QueryExecution implements IQueryExecution {
checkTimeOutForQuery();
doLogicalPlan();
doDistributedPlan();
+
// update timeout after finishing plan stage
context.setTimeOut(
context.getTimeOut() - (System.currentTimeMillis() -
context.getStartTime()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 2d84088d4ed..69adef7d9dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -135,7 +135,7 @@ import static
org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME;
public class LogicalPlanBuilder {
- private PlanNode root;
+ protected PlanNode root;
private final MPPQueryContext context;
@@ -155,7 +155,7 @@ public class LogicalPlanBuilder {
return this;
}
- private void updateTypeProvider(Collection<Expression> expressions) {
+ void updateTypeProvider(Collection<Expression> expressions) {
if (expressions == null) {
return;
}
@@ -737,7 +737,7 @@ public class LogicalPlanBuilder {
}
}
- private PlanNode convergeWithTimeJoin(List<PlanNode> sourceNodes, Ordering
mergeOrder) {
+ protected PlanNode convergeWithTimeJoin(List<PlanNode> sourceNodes, Ordering
mergeOrder) {
PlanNode tmpNode;
if (sourceNodes.size() == 1) {
tmpNode = sourceNodes.get(0);
@@ -777,13 +777,16 @@ public class LogicalPlanBuilder {
? queryStatement.getRowOffset() + queryStatement.getRowLimit()
: queryStatement.getRowLimit();
+ // 1. LIMIT and LIMIT_VALUE is smaller than 1000000,
+ // 2. `order by based on time` or `order by based on expression`,
+ // 3. no aggregation,
+ // when satisfy all above requirements use ToKNode.
if (!queryStatement.isAggregationQuery()
&& queryStatement.hasLimit()
&& queryStatement.getOrderByComponent() != null
&& !queryStatement.isOrderByBasedOnDevice()
&& limitValue <= LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE) {
- // order by time and order by expression with limit, can be optimized to
TopK implementation
TopKNode topKNode =
new TopKNode(
context.getQueryId().genPlanNodeId(),
@@ -794,7 +797,8 @@ public class LogicalPlanBuilder {
// if value filter exists, need add a LIMIT-NODE as the child node of
TopKNode
long valueFilterLimit = queryStatement.hasWhere() ? limitValue : -1;
- if ((queryStatement.isOrderByBasedOnTime() &&
!queryStatement.hasOrderByExpression())) {
+ // order by based on time, use TopKNode + SingleDeviceViewNode
+ if (queryStatement.isOrderByBasedOnTime() &&
!queryStatement.hasOrderByExpression()) {
addSingleDeviceViewNodes(
topKNode,
deviceNameToSourceNodesMap,
@@ -802,6 +806,7 @@ public class LogicalPlanBuilder {
deviceToMeasurementIndexesMap,
valueFilterLimit);
} else {
+ // order by based on expression, use TopKNode + DeviceViewNode
topKNode.addChild(
addDeviceViewNode(
orderByParameter,
@@ -813,9 +818,12 @@ public class LogicalPlanBuilder {
this.root = topKNode;
}
- // order by time + no limit, device can be optimized by
SingleDeviceViewNode and MergeSortNode
+ // 1. `order by based on time` + `no order by expression`,
+ // 2. no LIMIT or LIMIT_VALUE is larger than 1000000,
+ // when satisfy all above requirements use MergeSortNode.
else if (queryStatement.isOrderByBasedOnTime() &&
!queryStatement.hasOrderByExpression()) {
if (deviceNameToSourceNodesMap.size() == 1) {
+ // only one device, use DeviceViewNode, no need MergeSortNode
this.root =
addDeviceViewNode(
orderByParameter,
@@ -824,6 +832,7 @@ public class LogicalPlanBuilder {
deviceNameToSourceNodesMap,
-1);
} else {
+ // otherwise use MergeSortNode + SingleDeviceViewNode
MergeSortNode mergeSortNode =
new MergeSortNode(
context.getQueryId().genPlanNodeId(), orderByParameter,
outputColumnNames);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index a8bf4d38fec..18ddd5b1aca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -123,6 +123,10 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
@Override
public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext
context) {
+ if (analysis.isAllDevicesInOneTemplate()) {
+ return new TemplatedLogicalPlan(analysis, queryStatement,
context).visitQuery();
+ }
+
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
if (queryStatement.isLastQuery()) {
@@ -250,7 +254,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
queryStatement.getResultTimeOrder(),
analysis.getGlobalTimeFilter(),
0,
- pushDownLimitToScanNode(queryStatement),
+ pushDownLimitToScanNode(queryStatement, analysis),
analysis.isLastLevelUseWildcard())
.planWhereAndSourceTransform(
whereExpression,
@@ -353,7 +357,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
return planBuilder.getRoot();
}
- private long pushDownLimitToScanNode(QueryStatement queryStatement) {
+ static long pushDownLimitToScanNode(QueryStatement queryStatement, Analysis
analysis) {
// `order by time|device LIMIT N align by device` and no value filter,
// can push down limitValue to ScanNode
if (queryStatement.isAlignByDevice()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index b97dee886e9..e78d41133cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -233,6 +233,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.time.ZoneId;
@@ -261,6 +263,8 @@ import static
org.apache.iotdb.db.utils.TimestampPrecisionUtils.TIMESTAMP_PRECIS
/** This Visitor is responsible for transferring PlanNode Tree to Operator
Tree. */
public class OperatorTreeGenerator extends PlanVisitor<Operator,
LocalExecutionPlanContext> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OperatorTreeGenerator.class);
+
private static final MPPDataExchangeManager MPP_DATA_EXCHANGE_MANAGER =
MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
@@ -341,7 +345,10 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
seriesScanOptionsBuilder.withLimit(node.getLimit());
seriesScanOptionsBuilder.withOffset(node.getOffset());
AlignedPath seriesPath = node.getAlignedPath();
- seriesScanOptionsBuilder.withAllSensors(new
HashSet<>(seriesPath.getMeasurementList()));
+ seriesScanOptionsBuilder.withAllSensors(
+ context.getTypeProvider().getAllSensors() != null
+ ? context.getTypeProvider().getAllSensors()
+ : new HashSet<>(seriesPath.getMeasurementList()));
OperatorContext operatorContext =
context
@@ -357,7 +364,8 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
seriesPath,
node.getScanOrder(),
seriesScanOptionsBuilder.build(),
- node.isQueryAllSensors());
+ node.isQueryAllSensors(),
+ context.getTypeProvider().getDataTypes());
((DataDriverContext)
context.getDriverContext()).addSourceOperator(seriesScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
@@ -1871,7 +1879,10 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getMergeOrder() == Ordering.ASC ? ASC_TIME_COMPARATOR :
DESC_TIME_COMPARATOR;
List<OutputColumn> outputColumns = generateOutputColumnsFromChildren(node);
List<ColumnMerger> mergers = createColumnMergers(outputColumns,
timeComparator);
- List<TSDataType> outputColumnTypes = getOutputColumnTypes(node,
context.getTypeProvider());
+ List<TSDataType> outputColumnTypes =
+ context.getTypeProvider().getMeasurementList() != null
+ ? getOutputColumnTypesOfTimeJoinNode(node)
+ : getOutputColumnTypes(node, context.getTypeProvider());
return new RowBasedTimeJoinOperator(
operatorContext,
@@ -2476,11 +2487,36 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
}
private List<TSDataType> getInputColumnTypes(PlanNode node, TypeProvider
typeProvider) {
- return node.getChildren().stream()
- .map(PlanNode::getOutputColumnNames)
- .flatMap(List::stream)
- .map(typeProvider::getType)
- .collect(Collectors.toList());
+ if (typeProvider.getMeasurementList() == null) {
+ return node.getChildren().stream()
+ .map(PlanNode::getOutputColumnNames)
+ .flatMap(List::stream)
+ .map(typeProvider::getType)
+ .collect(Collectors.toList());
+ } else {
+ return getInputColumnTypesUseTemplate(node, typeProvider);
+ }
+ }
+
+ private List<TSDataType> getInputColumnTypesUseTemplate(
+ PlanNode node, TypeProvider typeProvider) {
+ // Only templated device + filter situation can invoke this method,
+ // the children of FilterNode/TransformNode can be TimeJoinNode, ScanNode,
any others?
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (PlanNode child : node.getChildren()) {
+ if (child instanceof SeriesScanNode) {
+ dataTypes.add(((SeriesScanNode)
child).getSeriesPath().getSeriesType());
+ } else if (child instanceof AlignedSeriesScanNode) {
+ AlignedSeriesScanNode alignedSeriesScanNode = (AlignedSeriesScanNode)
child;
+ alignedSeriesScanNode
+ .getAlignedPath()
+ .getSchemaList()
+ .forEach(c -> dataTypes.add(c.getType()));
+ } else {
+ dataTypes.addAll(getInputColumnTypesUseTemplate(child, typeProvider));
+ }
+ }
+ return dataTypes;
}
private List<TSDataType> getOutputColumnTypes(PlanNode node, TypeProvider
typeProvider) {
@@ -2489,6 +2525,26 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
.collect(Collectors.toList());
}
+ private List<TSDataType> getOutputColumnTypesOfTimeJoinNode(PlanNode node) {
+ // Only templated device situation can invoke this method,
+ // the children of TimeJoinNode can only be ScanNode or TimeJoinNode
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (PlanNode child : node.getChildren()) {
+ if (child instanceof SeriesScanNode) {
+ dataTypes.add(((SeriesScanNode)
child).getSeriesPath().getSeriesType());
+ } else if (child instanceof AlignedSeriesScanNode) {
+ dataTypes.add(((AlignedSeriesScanNode)
child).getAlignedPath().getSeriesType());
+ } else if (child instanceof TimeJoinNode) {
+ dataTypes.addAll(getOutputColumnTypesOfTimeJoinNode(child));
+ } else {
+ LOGGER.error(
+ "Unexpected PlanNode in getOutputColumnTypesOfTimeJoinNode, type:
{}",
+ child.getOutputColumnNames());
+ }
+ }
+ return dataTypes;
+ }
+
private Operator generateOnlyChildOperator(PlanNode node,
LocalExecutionPlanContext context) {
List<Operator> children =
node.getChildren().stream()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
index b8920cffbc7..8d4df010115 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.SimplePlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTagNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
@@ -45,7 +46,12 @@ public class SubPlanTypeExtractor {
private SubPlanTypeExtractor() {}
public static TypeProvider extractor(PlanNode root, TypeProvider allTypes) {
- TypeProvider typeProvider = new TypeProvider();
+ TypeProvider typeProvider =
+ new TypeProvider(
+ allTypes.getMeasurementList(),
+ allTypes.getSchemaList(),
+ allTypes.getDataTypes(),
+ allTypes.getAllSensors());
root.accept(new Visitor(typeProvider, allTypes), null);
return typeProvider;
}
@@ -156,6 +162,14 @@ public class SubPlanTypeExtractor {
return visitPlan(node, context);
}
+ @Override
+ public Void visitSingleDeviceView(SingleDeviceViewNode node, Void context)
{
+ if (typeProvider.getMeasurementList() != null) {
+ return null;
+ }
+ return visitPlan(node, context);
+ }
+
// end region PlanNode of last read
private void updateTypeProviderByAggregationDescriptor(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
new file mode 100644
index 00000000000..ead71d394f9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner;
+
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchSourceExpressions;
+import static
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanVisitor.pushDownLimitToScanNode;
+
+/**
+ * This class provides accelerated implementation for multiple devices align
by device query. This
+ * optimization is only used for devices set in only one template, using
template can avoid many
+ * unnecessary judgements.
+ */
+public class TemplatedLogicalPlan {
+
+ private final Analysis analysis;
+ private final QueryStatement queryStatement;
+ private final MPPQueryContext context;
+ private final List<String> measurementList;
+ private final List<IMeasurementSchema> schemaList;
+
+ public TemplatedLogicalPlan(
+ Analysis analysis, QueryStatement queryStatement, MPPQueryContext
context) {
+ this.analysis = analysis;
+ this.queryStatement = queryStatement;
+ this.context = context;
+
+ measurementList = analysis.getMeasurementList();
+ schemaList = analysis.getMeasurementSchemaList();
+ }
+
+ public PlanNode visitQuery() {
+ LogicalPlanBuilder planBuilder =
+ new TemplatedLogicalPlanBuilder(analysis, context, measurementList,
schemaList);
+
+ Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>();
+ for (PartialPath devicePath : analysis.getDeviceList()) {
+ String deviceName = devicePath.getFullPath();
+ PlanNode rootNode = visitQueryBody(devicePath, analysis, queryStatement,
context);
+
+ LogicalPlanBuilder subPlanBuilder =
+ new TemplatedLogicalPlanBuilder(analysis, context, measurementList,
schemaList)
+ .withNewRoot(rootNode);
+
+ // sortOperator push down
+ if (queryStatement.needPushDownSort()) {
+ subPlanBuilder =
+ subPlanBuilder.planOrderBy(
+ analysis.getDeviceToOrderByExpressions().get(deviceName),
+ analysis.getDeviceToSortItems().get(deviceName));
+ }
+ deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
+ }
+
+ // convert to ALIGN BY DEVICE view
+ planBuilder =
+ planBuilder.planDeviceView(
+ deviceToSubPlanMap,
+ analysis.getDeviceViewOutputExpressions(),
+ analysis.getDeviceViewInputIndexesMap(),
+ analysis.getSelectExpressions(),
+ queryStatement);
+
+ if (planBuilder.getRoot() instanceof TopKNode) {
+ analysis.setUseTopKNode();
+ }
+
+ if (!queryStatement.needPushDownSort()) {
+ planBuilder =
+ planBuilder.planOrderBy(
+ queryStatement, analysis.getOrderByExpressions(),
analysis.getSelectExpressions());
+ }
+
+ // other upstream node
+ planBuilder =
+ planBuilder
+ .planFill(analysis.getFillDescriptor(),
queryStatement.getResultTimeOrder())
+ .planOffset(queryStatement.getRowOffset())
+ .planLimit(queryStatement.getRowLimit());
+
+ return planBuilder.getRoot();
+ }
+
+ public PlanNode visitQueryBody(
+ PartialPath devicePath,
+ Analysis analysis,
+ QueryStatement queryStatement,
+ MPPQueryContext context) {
+
+ List<String> mergedMeasurementList = measurementList;
+ List<IMeasurementSchema> mergedSchemaList = schemaList;
+
+ // to fix this query: `select s1 from root.** where s2>1 align by device`
+ // or `select s1 from root.** order by s2 align by device`.
+ Expression whereExpression =
+ analysis.getDeviceToWhereExpression() != null
+ ?
analysis.getDeviceToWhereExpression().get(devicePath.getFullPath())
+ : null;
+ if (whereExpression != null && !analysis.isTemplateWildCardQuery()) {
+ mergedMeasurementList = new ArrayList<>(measurementList);
+ mergedSchemaList = new ArrayList<>(schemaList);
+ Set<String> selectExpressions = new HashSet<>(measurementList);
+ List<Expression> whereSourceExpressions =
searchSourceExpressions(whereExpression);
+ for (Expression expression : whereSourceExpressions) {
+ if (expression instanceof TimeSeriesOperand) {
+ String measurement = ((TimeSeriesOperand)
expression).getPath().getMeasurement();
+ if (!selectExpressions.contains(measurement)) {
+ selectExpressions.add(measurement);
+ mergedMeasurementList.add(measurement);
+
mergedSchemaList.add(analysis.getDeviceTemplate().getSchema(measurement));
+ }
+ }
+ }
+ }
+
+ TemplatedLogicalPlanBuilder planBuilder =
+ new TemplatedLogicalPlanBuilder(analysis, context,
mergedMeasurementList, mergedSchemaList);
+
+ planBuilder =
+ planBuilder.planRawDataSource(
+ devicePath,
+ queryStatement.getResultTimeOrder(),
+ analysis.getGlobalTimeFilter(),
+ 0,
+ pushDownLimitToScanNode(queryStatement, analysis),
+ analysis.isLastLevelUseWildcard());
+
+ if (whereExpression != null) {
+ Expression[] outputExpressions = new Expression[measurementList.size()];
+ for (int i = 0; i < analysis.getMeasurementList().size(); i++) {
+ outputExpressions[i] =
+ new TimeSeriesOperand(
+ new MeasurementPath(
+ devicePath.concatNode(measurementList.get(i)).getNodes(),
schemaList.get(i)));
+ }
+
+ planBuilder =
+ planBuilder.planFilter(
+ whereExpression,
+ outputExpressions,
+ queryStatement.isGroupByTime(),
+ queryStatement.getSelectComponent().getZoneId(),
+ queryStatement.getResultTimeOrder());
+ }
+
+ if (context.getTypeProvider().getMeasurementList() == null) {
+ context.getTypeProvider().setMeasurementList(mergedMeasurementList);
+ context.getTypeProvider().setSchemaList(mergedSchemaList);
+ context
+ .getTypeProvider()
+ .setDataTypes(
+ mergedSchemaList.stream()
+ .map(IMeasurementSchema::getType)
+ .collect(Collectors.toList()));
+ context.getTypeProvider().setAllSensors(new
HashSet<>(mergedMeasurementList));
+ }
+
+ return planBuilder.getRoot();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
new file mode 100644
index 00000000000..6770053fea5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner;
+
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class provides accelerated implementation for multiple devices align
by device query. This
+ * optimization is only used for devices set in only one template, using
template can avoid many
+ * unnecessary judgements.
+ */
+public class TemplatedLogicalPlanBuilder extends LogicalPlanBuilder {
+ private final MPPQueryContext context;
+
+ private final Analysis analysis;
+
+ private final List<String> measurementList;
+ private final List<IMeasurementSchema> schemaList;
+
+ public TemplatedLogicalPlanBuilder(
+ Analysis analysis,
+ MPPQueryContext context,
+ List<String> measurementList,
+ List<IMeasurementSchema> schemaList) {
+ super(analysis, context);
+ this.analysis = analysis;
+ this.context = context;
+ this.measurementList = measurementList;
+ this.schemaList = schemaList;
+ }
+
+ public TemplatedLogicalPlanBuilder planRawDataSource(
+ PartialPath devicePath,
+ Ordering scanOrder,
+ Filter timeFilter,
+ long offset,
+ long limit,
+ boolean lastLevelUseWildcard) {
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+
+ if (analysis.getDeviceTemplate().isDirectAligned()) {
+ AlignedPath path = new AlignedPath(devicePath);
+ path.setMeasurementList(measurementList);
+ path.addSchemas(schemaList);
+
+ AlignedSeriesScanNode alignedSeriesScanNode =
+ new AlignedSeriesScanNode(
+ context.getQueryId().genPlanNodeId(),
+ path,
+ scanOrder,
+ timeFilter,
+ timeFilter,
+ limit,
+ offset,
+ null,
+ lastLevelUseWildcard);
+ sourceNodeList.add(alignedSeriesScanNode);
+ } else {
+ for (int i = 0; i < measurementList.size(); i++) {
+ MeasurementPath measurementPath =
+ new MeasurementPath(devicePath.concatNode(measurementList.get(i)),
schemaList.get(i));
+ SeriesScanNode seriesScanNode =
+ new SeriesScanNode(
+ context.getQueryId().genPlanNodeId(),
+ measurementPath,
+ scanOrder,
+ timeFilter,
+ timeFilter,
+ limit,
+ offset,
+ null);
+ sourceNodeList.add(seriesScanNode);
+ }
+ }
+
+ this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
+ return this;
+ }
+
+ public TemplatedLogicalPlanBuilder planFilter(
+ Expression filterExpression,
+ Expression[] outputExpressions,
+ boolean isGroupByTime,
+ ZoneId zoneId,
+ Ordering scanOrder) {
+
+ if (filterExpression == null) {
+ return this;
+ }
+
+ this.root =
+ new FilterNode(
+ context.getQueryId().genPlanNodeId(),
+ this.getRoot(),
+ outputExpressions,
+ filterExpression,
+ isGroupByTime,
+ zoneId,
+ scanOrder);
+
+ updateTypeProvider(Collections.singletonList(filterExpression));
+
+ return this;
+ }
+
+ @Override
+ public TemplatedLogicalPlanBuilder withNewRoot(PlanNode newRoot) {
+ this.root = newRoot;
+ return this;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index d22958a5a1f..79d9f6beb96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -381,9 +381,6 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
TopKNode rootNode = (TopKNode) node;
Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
for (PlanNode child : visitedChildren) {
- if (child instanceof SingleDeviceViewNode) {
- ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
- }
TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).region;
regionTopKNodeMap
.computeIfAbsent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 02e160864dd..fb07fe5a738 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -76,7 +76,6 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanContext> {
@@ -148,9 +147,10 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
@Override
public List<PlanNode> visitDeviceView(DeviceViewNode node,
DistributionPlanContext context) {
- checkArgument(
- node.getDevices().size() == node.getChildren().size(),
- "size of devices and its children in DeviceViewNode should be same");
+ if (node.getDevices().size() != node.getChildren().size()) {
+ throw new IllegalArgumentException(
+ "size of devices and its children in DeviceViewNode should be same");
+ }
// If the DeviceView is mixed with Function that need to merge data from
different Data Region,
// it should be processed by a special logic.
@@ -162,17 +162,21 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
Set<TRegionReplicaSet> relatedDataRegions = new HashSet<>();
List<DeviceViewSplit> deviceViewSplits = new ArrayList<>();
+
// Step 1: constructs DeviceViewSplit
- Map<String, List<String>> outputDeviceToQueriedDevicesMap =
+ Map<String, String> outputDeviceToQueriedDevicesMap =
analysis.getOutputDeviceToQueriedDevicesMap();
for (int i = 0; i < node.getDevices().size(); i++) {
String outputDevice = node.getDevices().get(i);
PlanNode child = node.getChildren().get(i);
- List<TRegionReplicaSet> regionReplicaSets = new ArrayList<>();
- for (String queriedDevice :
outputDeviceToQueriedDevicesMap.get(outputDevice)) {
- regionReplicaSets.addAll(
- analysis.getPartitionInfo(queriedDevice,
analysis.getGlobalTimeFilter()));
- }
+ List<TRegionReplicaSet> regionReplicaSets =
+ analysis.isAllDevicesInOneTemplate()
+ ? new ArrayList<>(
+ analysis.getPartitionInfo(outputDevice,
analysis.getGlobalTimeFilter()))
+ : new ArrayList<>(
+ analysis.getPartitionInfo(
+ outputDeviceToQueriedDevicesMap.get(outputDevice),
+ analysis.getGlobalTimeFilter()));
deviceViewSplits.add(new DeviceViewSplit(outputDevice, child,
regionReplicaSets));
relatedDataRegions.addAll(regionReplicaSets);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
index 35d825fe074..b116bc1950b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/traverser/Traverser.java
@@ -185,6 +185,7 @@ public abstract class Traverser<R, N extends IMNode<N>>
extends AbstractTreeVisi
@Override
protected Iterator<N> getChildrenIterator(N parent, Iterator<String>
childrenName)
throws Exception {
+
return new IMNodeIterator<N>() {
private N next = null;
private boolean skipTemplateChildren = false;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
index 4ab3237998f..4965a212ac5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -128,7 +128,8 @@ public class AlignedSeriesScanOperatorTest {
alignedPath,
Ordering.ASC,
getDefaultSeriesScanOptions(alignedPath),
- false);
+ false,
+ null);
seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources,
unSeqResources));
seriesScanOperator
.getOperatorContext()
@@ -222,7 +223,8 @@ public class AlignedSeriesScanOperatorTest {
alignedPath1,
Ordering.ASC,
getDefaultSeriesScanOptions(alignedPath1),
- false);
+ false,
+ null);
seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
seriesScanOperator1
.getOperatorContext()
@@ -244,7 +246,8 @@ public class AlignedSeriesScanOperatorTest {
alignedPath2,
Ordering.ASC,
getDefaultSeriesScanOptions(alignedPath2),
- false);
+ false,
+ null);
seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
seriesScanOperator2
.getOperatorContext()
@@ -514,7 +517,8 @@ public class AlignedSeriesScanOperatorTest {
alignedPath1,
Ordering.DESC,
getDefaultSeriesScanOptions(alignedPath1),
- false);
+ false,
+ null);
seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
seriesScanOperator1
.getOperatorContext()
@@ -536,7 +540,8 @@ public class AlignedSeriesScanOperatorTest {
alignedPath2,
Ordering.DESC,
getDefaultSeriesScanOptions(alignedPath2),
- false);
+ false,
+ null);
seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
seriesScanOperator2
.getOperatorContext()
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
index edb2de5272c..4ee62b7dbd4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
@@ -193,7 +193,8 @@ public class OperatorMemoryTest {
alignedPath,
Ordering.ASC,
SeriesScanOptions.getDefaultSeriesScanOptions(alignedPath),
- false);
+ false,
+ null);
long maxPeekMemory =
Math.max(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
index 8961c1e207f..8af58bb3c5a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
@@ -40,7 +40,7 @@ public class DistributionPlannerCycleTest {
// Query sql: `select * from root.sg.d1,root.sg.d2`
// root.sg.d1 has 2 SeriesScanNodes, root.sg.d2 has 3 SeriesScanNodes.
//
- //
------------------------------------------------------------------------------------------------
+ //
-----------------------------------------------------------------------------------------
// Note: d1.s1[1] means a SeriesScanNode with target series d1.s1 and its
data region is 1
//
// IdentityNode
@@ -51,7 +51,7 @@ public class DistributionPlannerCycleTest {
// TimeJoinNode
// / \ \
// d2.s1[2] d2.s2[2]
d2.s3[2]
- //
------------------------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------------------------
@Test
public void timeJoinNodeTest() {
QueryId queryId = new QueryId("test");
@@ -67,12 +67,13 @@ public class DistributionPlannerCycleTest {
assertEquals(2, plan.getInstances().size());
PlanNode firstNode =
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
- PlanNode secondNode =
-
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
assertEquals(3, firstNode.getChildren().size());
assertTrue(firstNode.getChildren().get(0) instanceof SeriesScanNode);
assertTrue(firstNode.getChildren().get(1) instanceof SeriesScanNode);
assertTrue(firstNode.getChildren().get(2) instanceof ExchangeNode);
+
+ PlanNode secondNode =
+
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
assertEquals(3, secondNode.getChildren().size());
assertTrue(secondNode.getChildren().get(0) instanceof SeriesScanNode);
assertTrue(secondNode.getChildren().get(1) instanceof SeriesScanNode);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util.java
index 2fef41699a1..f6c3e4bdf68 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util.java
@@ -344,7 +344,7 @@ public class Util {
@Override
public Map<Integer, Template> checkAllRelatedTemplate(PartialPath
pathPattern) {
- return null;
+ return Collections.emptyMap();
}
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
index 5124c61c28b..89a072b0792 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
@@ -71,6 +71,10 @@ import java.util.Map;
public class Util2 {
public static final Analysis ANALYSIS = constructAnalysis();
+ private static final String device1 = "root.sg.d1";
+ private static final String device2 = "root.sg.d2";
+ private static final String device3 = "root.sg.d3";
+
public static Analysis constructAnalysis() {
TRegionReplicaSet dataRegion1 =
new TRegionReplicaSet(
@@ -90,23 +94,21 @@ public class Util2 {
Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new
HashMap<>();
d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
- DataPartition dataPartition =
- new DataPartition(
-
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
dataPartitionMap = new HashMap<>();
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> sgPartitionMap =
new HashMap<>();
- String device1 = "root.sg.d1";
- String device2 = "root.sg.d2";
- String device3 = "root.sg.d3";
+
SeriesPartitionExecutor executor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
sgPartitionMap.put(executor.getSeriesPartitionSlot(device1),
d1DataRegionMap);
sgPartitionMap.put(executor.getSeriesPartitionSlot(device2),
d2DataRegionMap);
+ DataPartition dataPartition =
+ new DataPartition(
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
dataPartitionMap.put("root.sg", sgPartitionMap);
dataPartition.setDataPartitionMap(dataPartitionMap);