This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/count_time_fe
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/count_time_fe by this
push:
new 17a9c5bcfe5 add align be device optimization
17a9c5bcfe5 is described below
commit 17a9c5bcfe5002e98e1bce0aac02719e04840962
Author: Beyyes <[email protected]>
AuthorDate: Tue Aug 22 09:03:52 2023 +0800
add align be device optimization
---
.../aggregation/IoTDBCountTimeAlignedDeviceIT.java | 350 +++++++++++++++++++++
.../plan/planner/LogicalPlanBuilder.java | 60 +++-
.../plan/planner/LogicalPlanVisitor.java | 35 ++-
3 files changed, 436 insertions(+), 9 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBCountTimeAlignedDeviceIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBCountTimeAlignedDeviceIT.java
new file mode 100644
index 00000000000..c20c0c4861a
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBCountTimeAlignedDeviceIT.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.assertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+import static
org.apache.iotdb.db.queryengine.plan.expression.visitor.CountTimeAggregationAmountVisitor.COUNT_TIME_ONLY_SUPPORT_ONE_WILDCARD;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBCountTimeAlignedDeviceIT {
+
+ protected static final String[] SQL_LIST =
+ new String[] {
+ // test normal query
+ "CREATE DATABASE root.aligned.db;",
+ "CREATE ALIGNED TIMESERIES root.aligned.db.d1.s1 WITH DATATYPE=INT32,
ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.db.d1.s2 WITH DATATYPE=INT32,
ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.db.d2.s1 WITH DATATYPE=INT32,
ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.db.d2.s2 WITH DATATYPE=INT32,
ENCODING=PLAIN;",
+ "INSERT INTO root.aligned.db.d1(time, s1) VALUES(1, 1);",
+ "INSERT INTO root.aligned.db.d1(time, s2) VALUES(2, 2);",
+ "INSERT INTO root.aligned.db.d2(time, s2) VALUES(1, 1);",
+ // test group by time
+ "CREATE DATABASE root.aligned.downsampling;",
+ "CREATE ALIGNED TIMESERIES root.aligned.downsampling.d1.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.downsampling.d1.s2 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.downsampling.d2.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.downsampling.d2.s2 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "INSERT INTO root.aligned.downsampling.d1(time, s1) VALUES(0, 0),
(4,4), (5,5), (8,8);",
+ "INSERT INTO root.aligned.downsampling.d1(time, s2) VALUES(1, 1),
(2,2), (5,5), (7,7), (8,8), (9,9);",
+ "INSERT INTO root.aligned.downsampling.d2(time, s1) VALUES(1, 1),
(2,2), (5,5), (7,7), (8,8);",
+ "INSERT INTO root.aligned.downsampling.d2(time, s2) VALUES(0, 0),
(4,4), (5,5), (8,8);",
+ // test group by variation
+ "CREATE DATABASE root.aligned.variation;",
+ "CREATE ALIGNED TIMESERIES root.aligned.variation.d1.state WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.variation.d1.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.variation.d2.state WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.variation.d2.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "INSERT INTO root.aligned.variation.d1(time, state) VALUES(0,0),
(1,0), (3,0), (4,0),(5,1),(6,1);",
+ "INSERT INTO root.aligned.variation.d1(time, s1) VALUES(0,0), (2,2),
(3,3), (6,6);",
+ "INSERT INTO root.aligned.variation.d2(time, state) VALUES(0,0),
(2,1), (3,1), (4,1), (6,1);",
+ "INSERT INTO root.aligned.variation.d2(time, s1) VALUES(1,1), (2,2),
(3,3);",
+ // test group by session
+ "CREATE DATABASE root.aligned.session;",
+ "CREATE ALIGNED TIMESERIES root.aligned.session.d1.state WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.session.d1.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.session.d2.state WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.session.d2.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "INSERT INTO root.aligned.session.d1(time, state) VALUES(0,0), (1,0),
(20,0), (23,0),(40,0),(55,1),(56,1);",
+ "INSERT INTO root.aligned.session.d1(time, s1) VALUES(0,0), (20,2),
(23,3), (56,6);",
+ "INSERT INTO root.aligned.session.d2(time, state) VALUES(0,0), (20,1),
(23,1), (40,1), (56,1);",
+ "INSERT INTO root.aligned.session.d2(time, s1) VALUES(1,1), (20,2),
(23,3);",
+ // test group by condition
+ "CREATE DATABASE root.aligned.condition;",
+ "CREATE ALIGNED TIMESERIES root.aligned.condition.d1.state WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.condition.d1.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.condition.d1.s2 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.condition.d2.state WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.condition.d2.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "INSERT INTO root.aligned.condition.d1(time, state) VALUES(0,0),
(1,1), (23,1),(40,0),(55,1),(56,1);",
+ "INSERT INTO root.aligned.condition.d1(time, s1) VALUES(0,0), (23,3),
(56,6);",
+ "INSERT INTO root.aligned.condition.d1(time, s2) VALUES(0,0), (1,1),
(20,2), (23,3);",
+ "INSERT INTO root.aligned.condition.d2(time, state) VALUES(0,0),
(20,1), (23,1), (40,1), (56,1);",
+ "INSERT INTO root.aligned.condition.d2(time, s1) VALUES(1,1), (20,2),
(23,3);",
+ // test having
+ "CREATE DATABASE root.aligned.having;",
+ "CREATE ALIGNED TIMESERIES root.aligned.having.d1.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.having.d1.s2 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.having.d2.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.having.d2.s2 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "INSERT INTO root.aligned.having.d1(time, s1) VALUES(0, 0), (4,4),
(5,5), (8,8);",
+ "INSERT INTO root.aligned.having.d1(time, s2) VALUES(1, 1), (2,2),
(5,5), (7,7), (8,8), (9,9);",
+ "INSERT INTO root.aligned.having.d2(time, s1) VALUES(1, 1), (2,2),
(5,5), (7,7), (8,8), (9,9);",
+ "INSERT INTO root.aligned.having.d2(time, s2) VALUES(0, 0), (4,4),
(5,5), (8,8);",
+ // test aligned
+ "CREATE DATABASE root.aligned.aligned;",
+ "CREATE ALIGNED TIMESERIES root.aligned.aligned.d1.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.aligned.d1.s2 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.aligned.d2.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "CREATE ALIGNED TIMESERIES root.aligned.aligned.d2.s2 WITH
DATATYPE=INT32, ENCODING=PLAIN;",
+ "INSERT INTO root.aligned.aligned.d1(time, s1) ALIGNED VALUES(0, 0),
(4,4), (5,5), (8,8);",
+ "INSERT INTO root.aligned.aligned.d1(time, s2) ALIGNED VALUES(1, 1),
(2,2), (5,5), (7,7), (8,8), (9,9);",
+ "INSERT INTO root.aligned.aligned.d2(time, s1) ALIGNED VALUES(1, 1),
(2,2), (5,5), (7,7), (8,8);",
+ "INSERT INTO root.aligned.aligned.d2(time, s2) ALIGNED VALUES(0, 0),
(4,4), (5,5), (8,8);",
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // TODO set
+
EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData(SQL_LIST);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void normalQueryTest() {
+ // align by time
+ String[] expectedHeader = new String[] {"count_time(*)"};
+ String[] retArray = new String[] {"2,"};
+ resultSetEqualTest("SELECT COUNT_TIME(*) FROM root.aligned.db.**;",
expectedHeader, retArray);
+
+ expectedHeader = new String[] {"count_time(*)"};
+ retArray = new String[] {"2,"};
+ resultSetEqualTest(
+ "SELECT COUNT_TIME(*) FROM root.aligned.db.d1, root.aligned.db.d2;",
+ expectedHeader,
+ retArray);
+
+ // align by device
+ expectedHeader = new String[] {"Device,count_time(*)"};
+ retArray = new String[] {"root.aligned.db.d1,2,", "root.aligned.db.d2,1,"};
+ resultSetEqualTest(
+ "select count_time(*) from root.aligned.db.** align by device;",
expectedHeader, retArray);
+
+ expectedHeader = new String[] {"Device,count_time(*)"};
+ retArray = new String[] {"root.aligned.db.d1,2,", "root.aligned.db.d2,1,"};
+ resultSetEqualTest(
+ "select count_time(*) from root.aligned.db.d1,root.aligned.db.d2 align
by device;",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void groupByTimeTest() {
+ // align by time
+ String[] expectedHeader = new String[] {"Time,count_time(*)"};
+ String[] retArray = new String[] {"0,2,", "2,1,", "4,2,", "6,1,", "8,2,"};
+ resultSetEqualTest(
+ "SELECT count_time(*) FROM root.aligned.downsampling.** GROUP BY([0,
10), 2ms);",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "SELECT count_time(*) FROM
root.aligned.downsampling.d1,root.aligned.downsampling.d2 GROUP BY([0, 10),
2ms);",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "SELECT count_time(*) FROM
root.aligned.downsampling.d2,root.aligned.downsampling.d1 GROUP BY([0, 10),
2ms);",
+ expectedHeader,
+ retArray);
+
+ expectedHeader = new String[] {"Time,count_time(*)"};
+ retArray = new String[] {"0,2,", "2,1,", "4,2,", "6,1,", "8,2,"};
+ resultSetEqualTest(
+ "SELECT count_time(*) FROM root.aligned.downsampling.d1 GROUP BY([0,
10), 2ms);",
+ expectedHeader,
+ retArray);
+
+ // align by device
+ expectedHeader = new String[] {"Time,Device,count_time(*)"};
+ retArray =
+ new String[] {
+ "0,root.aligned.downsampling.d1,2,",
+ "2,root.aligned.downsampling.d1,1,",
+ "4,root.aligned.downsampling.d1,2,",
+ "6,root.aligned.downsampling.d1,1,",
+ "8,root.aligned.downsampling.d1,2,",
+ "0,root.aligned.downsampling.d2,2,",
+ "2,root.aligned.downsampling.d2,1,",
+ "4,root.aligned.downsampling.d2,2,",
+ "6,root.aligned.downsampling.d2,1,",
+ "8,root.aligned.downsampling.d2,1,",
+ };
+ resultSetEqualTest(
+ "SELECT count_time(*) FROM root.aligned.downsampling.** GROUP BY([0,
10), 2ms) ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "SELECT count_time(*) FROM
root.aligned.downsampling.d1,root.aligned.downsampling.d2 GROUP BY([0, 10),
2ms) ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "SELECT count_time(*) FROM
root.aligned.downsampling.d2,root.aligned.downsampling.d1 GROUP BY([0, 10),
2ms) ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+
+ // test sort
+ expectedHeader = new String[] {"Time,Device,count_time(*)"};
+ retArray =
+ new String[] {
+ "2,root.aligned.downsampling.d1,1,",
+ "6,root.aligned.downsampling.d1,1,",
+ "2,root.aligned.downsampling.d2,1,",
+ "6,root.aligned.downsampling.d2,1,",
+ "8,root.aligned.downsampling.d2,1,",
+ "0,root.aligned.downsampling.d1,2,",
+ "4,root.aligned.downsampling.d1,2,",
+ "8,root.aligned.downsampling.d1,2,",
+ "0,root.aligned.downsampling.d2,2,",
+ "4,root.aligned.downsampling.d2,2,",
+ };
+ resultSetEqualTest(
+ "SELECT count_time(*) FROM root.aligned.downsampling.** GROUP BY([0,
10), 2ms) ORDER BY count_time(*) ALIGN BY DEVICE;",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void groupByVariationTest() {
+ // align by time
+ String[] expectedHeader = new String[] {"Time,__endTime,count_time(*)"};
+ String[] retArray = new String[] {"0,1,2,", "2,2,1,", "3,4,2,", "5,6,2,"};
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from root.aligned.variation.d1 group
by variation(state, 0, ignoreNull=False);",
+ expectedHeader,
+ retArray);
+
+ // align by device
+ expectedHeader = new String[] {"Time,Device,__endTime,count_time(*)"};
+ retArray =
+ new String[] {
+ "0,root.aligned.variation.d1,1,2,",
+ "2,root.aligned.variation.d1,2,1,",
+ "3,root.aligned.variation.d1,4,2,",
+ "5,root.aligned.variation.d1,6,2,",
+ "0,root.aligned.variation.d2,0,1,",
+ "1,root.aligned.variation.d2,1,1,",
+ "2,root.aligned.variation.d2,6,4,",
+ };
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from root.aligned.variation.** "
+ + "group by variation(state, 0, ignoreNull=False) align by
device;",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from
root.aligned.variation.d1,root.aligned.variation.d2 "
+ + "group by variation(state, 0, ignoreNull=False) align by
device;",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from
root.aligned.variation.d2,root.aligned.variation.d1 "
+ + "group by variation(state, 0, ignoreNull=False) align by
device;",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void groupBySessionTest() {
+ // align by time
+ String[] expectedHeader = new String[] {"Time,__endTime,count_time(*)"};
+ String[] retArray = new String[] {"0,1,2,", "20,23,2,", "40,40,1,",
"55,56,2,"};
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from root.aligned.session.** group by
session(10ms);",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from
root.aligned.session.d1,root.aligned.session.d2 group by session(10ms);",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from
root.aligned.session.d2,root.aligned.session.d1 group by session(10ms);",
+ expectedHeader,
+ retArray);
+
+ // align by device
+ expectedHeader = new String[] {"Time,Device,__endTime,count_time(*)"};
+ retArray =
+ new String[] {
+ "0,root.aligned.session.d1,1,2,",
+ "20,root.aligned.session.d1,23,2,",
+ "40,root.aligned.session.d1,40,1,",
+ "55,root.aligned.session.d1,56,2,",
+ "0,root.aligned.session.d2,1,2,",
+ "20,root.aligned.session.d2,23,2,",
+ "40,root.aligned.session.d2,40,1,",
+ "56,root.aligned.session.d2,56,1,",
+ };
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from root.aligned.session.** group by
session(10ms) align by device;",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from
root.aligned.session.d1,root.aligned.session.d2 group by session(10ms) align by
device;",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "select __endTime, count_time(*) from
root.aligned.session.d2,root.aligned.session.d1 group by session(10ms) align by
device;",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void groupByConditionTest() {
+ // align by time
+ String[] expectedHeader = new String[] {"Time,count_time(*)"};
+ String[] retArray = new String[] {"55,2,"};
+ resultSetEqualTest(
+ "select count_time(*) from root.aligned.condition.d1 group by
condition(state=1, KEEP>=2, ignoreNull=false);",
+ expectedHeader,
+ retArray);
+
+ // align by device
+ expectedHeader = new String[] {"Time,Device,count_time(*)"};
+ retArray = new String[] {"55,root.aligned.condition.d1,2,",
"20,root.aligned.condition.d2,4,"};
+ resultSetEqualTest(
+ "select count_time(*) from root.aligned.condition.** group by
condition(state=1, KEEP>=2, ignoreNull=false) align by device;",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "select count_time(*) from
root.aligned.condition.d1,root.aligned.condition.d2 group by condition(state=1,
KEEP>=2, ignoreNull=false) align by device;",
+ expectedHeader,
+ retArray);
+ resultSetEqualTest(
+ "select count_time(*) from
root.aligned.condition.d2,root.aligned.condition.d1 group by condition(state=1,
KEEP>=2, ignoreNull=false) align by device;",
+ expectedHeader,
+ retArray);
+ }
+
+ @Test
+ public void testUnSupportedSql() {
+ assertTestFail(
+ "SELECT COUNT_TIME(s1) FROM root.aligned.db.**;",
+ TSStatusCode.SEMANTIC_ERROR.getStatusCode() + ": " +
COUNT_TIME_ONLY_SUPPORT_ONE_WILDCARD);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 31684bf0915..54f2ce71dbc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
@@ -97,6 +98,8 @@ import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import com.google.common.base.Function;
import org.apache.commons.lang3.Validate;
@@ -118,6 +121,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
public class LogicalPlanBuilder {
@@ -324,6 +328,7 @@ public class LogicalPlanBuilder {
boolean needCheckAscending = groupByTimeParameter == null;
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new
HashMap<>();
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new
HashMap<>();
+ Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new
HashMap<>();
for (Expression aggregationExpression : aggregationExpressions) {
createAggregationDescriptor(
(FunctionExpression) aggregationExpression,
@@ -331,13 +336,15 @@ public class LogicalPlanBuilder {
scanOrder,
needCheckAscending,
ascendingAggregations,
- descendingAggregations);
+ descendingAggregations,
+ countTimeAggregations);
}
List<PlanNode> sourceNodeList =
constructSourceNodeFromAggregationDescriptors(
ascendingAggregations,
descendingAggregations,
+ countTimeAggregations,
scanOrder,
timeFilter,
groupByTimeParameter);
@@ -372,6 +379,7 @@ public class LogicalPlanBuilder {
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new
HashMap<>();
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new
HashMap<>();
Map<AggregationDescriptor, Integer> aggregationToIndexMap = new
HashMap<>();
+ Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new
HashMap<>();
int index = 0;
for (Expression aggregationExpression : aggregationExpressions) {
@@ -382,7 +390,8 @@ public class LogicalPlanBuilder {
scanOrder,
needCheckAscending,
ascendingAggregations,
- descendingAggregations);
+ descendingAggregations,
+ countTimeAggregations);
aggregationToIndexMap.put(aggregationDescriptor,
deviceViewInputIndexes.get(index));
index++;
}
@@ -391,6 +400,7 @@ public class LogicalPlanBuilder {
constructSourceNodeFromAggregationDescriptors(
ascendingAggregations,
descendingAggregations,
+ countTimeAggregations,
scanOrder,
timeFilter,
groupByTimeParameter);
@@ -427,7 +437,8 @@ public class LogicalPlanBuilder {
Ordering scanOrder,
boolean needCheckAscending,
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
- Map<PartialPath, List<AggregationDescriptor>> descendingAggregations) {
+ Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
+ Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations) {
AggregationDescriptor aggregationDescriptor =
new AggregationDescriptor(
sourceExpression.getFunctionName(),
@@ -437,6 +448,38 @@ public class LogicalPlanBuilder {
if (curStep.isOutputPartial()) {
updateTypeProviderByPartialAggregation(aggregationDescriptor,
context.getTypeProvider());
}
+
+ if (COUNT_TIME.equalsIgnoreCase(sourceExpression.getFunctionName())) {
+ Map<String, Pair<List<String>, List<IMeasurementSchema>>> map = new
HashMap<>();
+ for (Expression expression : sourceExpression.getCountTimeExpressions())
{
+ TimeSeriesOperand ts = (TimeSeriesOperand) expression;
+ PartialPath path = ts.getPath();
+ Pair<List<String>, List<IMeasurementSchema>> pair =
+ map.computeIfAbsent(
+ path.getDevice(), k -> new Pair<>(new ArrayList<>(), new
ArrayList<>()));
+ pair.left.add(path.getMeasurement());
+ try {
+ pair.right.add(path.getMeasurementSchema());
+ } catch (MetadataException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ for (Map.Entry<String, Pair<List<String>, List<IMeasurementSchema>>>
entry : map.entrySet()) {
+ String device = entry.getKey();
+ Pair<List<String>, List<IMeasurementSchema>> pair = entry.getValue();
+ AlignedPath alignedPath = null;
+ try {
+ alignedPath = new AlignedPath(device, pair.left, pair.right);
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ countTimeAggregations.put(alignedPath,
Collections.singletonList(aggregationDescriptor));
+ }
+
+ return aggregationDescriptor;
+ }
+
PartialPath selectPath =
((TimeSeriesOperand)
sourceExpression.getExpressions().get(0)).getPath();
if (!needCheckAscending
@@ -456,13 +499,20 @@ public class LogicalPlanBuilder {
private List<PlanNode> constructSourceNodeFromAggregationDescriptors(
Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
+ Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations,
Ordering scanOrder,
Filter timeFilter,
GroupByTimeParameter groupByTimeParameter) {
+
List<PlanNode> sourceNodeList = new ArrayList<>();
boolean needCheckAscending = groupByTimeParameter == null;
- Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations
=
- MetaUtils.groupAlignedAggregations(ascendingAggregations);
+ Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations
= null;
+ if (!countTimeAggregations.isEmpty()) {
+ groupedAscendingAggregations = countTimeAggregations;
+ } else {
+ groupedAscendingAggregations =
MetaUtils.groupAlignedAggregations(ascendingAggregations);
+ }
+
for (Map.Entry<PartialPath, List<AggregationDescriptor>>
pathAggregationsEntry :
groupedAscendingAggregations.entrySet()) {
sourceNodeList.add(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 86b98eda96a..58396e11ac5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.queryengine.plan.planner;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
@@ -25,6 +27,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -83,6 +86,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -91,6 +96,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
+
/**
* This visitor is used to generate a logical plan for the statement and
returns the {@link
* PlanNode}.
@@ -248,7 +255,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
analysis.hasValueFilter()
|| analysis.hasGroupByParameter()
|| needTransform(sourceTransformExpressions)
- || cannotUseStatistics(aggregationExpressions);
+ || cannotUseStatistics(aggregationExpressions,
sourceTransformExpressions);
AggregationStep curStep;
if (isRawDataSource) {
planBuilder =
@@ -344,11 +351,31 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
return false;
}
- private boolean cannotUseStatistics(Set<Expression> expressions) {
+ private boolean cannotUseStatistics(
+ Set<Expression> expressions, Set<Expression> sourceTransformExpressions)
{
for (Expression expression : expressions) {
+
if (expression instanceof FunctionExpression) {
- if (!BuiltinAggregationFunction.canUseStatistics(
- ((FunctionExpression) expression).getFunctionName())) {
+ FunctionExpression functionExpression = (FunctionExpression)
expression;
+ if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName()))
{
+ String alignedDeviceId = "";
+ for (Expression countTimeExpression : sourceTransformExpressions) {
+ TimeSeriesOperand ts = (TimeSeriesOperand) countTimeExpression;
+ if (!(ts.getPath() instanceof AlignedPath
+ || ((MeasurementPath) ts.getPath()).isUnderAlignedEntity())) {
+ return true;
+ }
+ if (StringUtils.isEmpty(alignedDeviceId)) {
+ alignedDeviceId = ts.getPath().getDevice();
+ } else if
(!alignedDeviceId.equalsIgnoreCase(ts.getPath().getDevice())) {
+ // count_time from only one aligned device can use
AlignedSeriesAggScan
+ return true;
+ }
+ }
+ return false;
+ }
+
+ if
(!BuiltinAggregationFunction.canUseStatistics(functionExpression.getFunctionName()))
{
return true;
}
} else {