This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/count_time_fe
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/count_time_fe by this 
push:
     new 17a9c5bcfe5 add align be device optimization
17a9c5bcfe5 is described below

commit 17a9c5bcfe5002e98e1bce0aac02719e04840962
Author: Beyyes <[email protected]>
AuthorDate: Tue Aug 22 09:03:52 2023 +0800

    add align be device optimization
---
 .../aggregation/IoTDBCountTimeAlignedDeviceIT.java | 350 +++++++++++++++++++++
 .../plan/planner/LogicalPlanBuilder.java           |  60 +++-
 .../plan/planner/LogicalPlanVisitor.java           |  35 ++-
 3 files changed, 436 insertions(+), 9 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBCountTimeAlignedDeviceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBCountTimeAlignedDeviceIT.java
new file mode 100644
index 00000000000..c20c0c4861a
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBCountTimeAlignedDeviceIT.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.assertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+import static 
org.apache.iotdb.db.queryengine.plan.expression.visitor.CountTimeAggregationAmountVisitor.COUNT_TIME_ONLY_SUPPORT_ONE_WILDCARD;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBCountTimeAlignedDeviceIT {
+
+  protected static final String[] SQL_LIST =
+      new String[] {
+        // test normal query
+        "CREATE DATABASE root.aligned.db;",
+        "CREATE ALIGNED TIMESERIES root.aligned.db.d1.s1 WITH DATATYPE=INT32, 
ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.db.d1.s2 WITH DATATYPE=INT32, 
ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.db.d2.s1 WITH DATATYPE=INT32, 
ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.db.d2.s2 WITH DATATYPE=INT32, 
ENCODING=PLAIN;",
+        "INSERT INTO root.aligned.db.d1(time, s1) VALUES(1, 1);",
+        "INSERT INTO root.aligned.db.d1(time, s2) VALUES(2, 2);",
+        "INSERT INTO root.aligned.db.d2(time, s2) VALUES(1, 1);",
+        // test group by time
+        "CREATE DATABASE root.aligned.downsampling;",
+        "CREATE ALIGNED TIMESERIES root.aligned.downsampling.d1.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.downsampling.d1.s2 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.downsampling.d2.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.downsampling.d2.s2 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "INSERT INTO root.aligned.downsampling.d1(time, s1) VALUES(0, 0), 
(4,4), (5,5), (8,8);",
+        "INSERT INTO root.aligned.downsampling.d1(time, s2) VALUES(1, 1), 
(2,2), (5,5), (7,7), (8,8), (9,9);",
+        "INSERT INTO root.aligned.downsampling.d2(time, s1) VALUES(1, 1), 
(2,2), (5,5), (7,7), (8,8);",
+        "INSERT INTO root.aligned.downsampling.d2(time, s2) VALUES(0, 0), 
(4,4), (5,5), (8,8);",
+        // test group by variation
+        "CREATE DATABASE root.aligned.variation;",
+        "CREATE ALIGNED TIMESERIES root.aligned.variation.d1.state WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.variation.d1.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.variation.d2.state WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.variation.d2.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "INSERT INTO root.aligned.variation.d1(time, state) VALUES(0,0), 
(1,0), (3,0), (4,0),(5,1),(6,1);",
+        "INSERT INTO root.aligned.variation.d1(time, s1) VALUES(0,0), (2,2), 
(3,3), (6,6);",
+        "INSERT INTO root.aligned.variation.d2(time, state) VALUES(0,0), 
(2,1), (3,1), (4,1), (6,1);",
+        "INSERT INTO root.aligned.variation.d2(time, s1) VALUES(1,1), (2,2), 
(3,3);",
+        // test group by session
+        "CREATE DATABASE root.aligned.session;",
+        "CREATE ALIGNED TIMESERIES root.aligned.session.d1.state WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.session.d1.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.session.d2.state WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.session.d2.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "INSERT INTO root.aligned.session.d1(time, state) VALUES(0,0), (1,0), 
(20,0), (23,0),(40,0),(55,1),(56,1);",
+        "INSERT INTO root.aligned.session.d1(time, s1) VALUES(0,0), (20,2), 
(23,3), (56,6);",
+        "INSERT INTO root.aligned.session.d2(time, state) VALUES(0,0), (20,1), 
(23,1), (40,1), (56,1);",
+        "INSERT INTO root.aligned.session.d2(time, s1) VALUES(1,1), (20,2), 
(23,3);",
+        // test group by condition
+        "CREATE DATABASE root.aligned.condition;",
+        "CREATE ALIGNED TIMESERIES root.aligned.condition.d1.state WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.condition.d1.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.condition.d1.s2 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.condition.d2.state WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.condition.d2.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "INSERT INTO root.aligned.condition.d1(time, state) VALUES(0,0), 
(1,1), (23,1),(40,0),(55,1),(56,1);",
+        "INSERT INTO root.aligned.condition.d1(time, s1) VALUES(0,0), (23,3), 
(56,6);",
+        "INSERT INTO root.aligned.condition.d1(time, s2) VALUES(0,0), (1,1), 
(20,2), (23,3);",
+        "INSERT INTO root.aligned.condition.d2(time, state) VALUES(0,0), 
(20,1), (23,1), (40,1), (56,1);",
+        "INSERT INTO root.aligned.condition.d2(time, s1) VALUES(1,1), (20,2), 
(23,3);",
+        // test having
+        "CREATE DATABASE root.aligned.having;",
+        "CREATE ALIGNED TIMESERIES root.aligned.having.d1.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.having.d1.s2 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.having.d2.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.having.d2.s2 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "INSERT INTO root.aligned.having.d1(time, s1) VALUES(0, 0), (4,4), 
(5,5), (8,8);",
+        "INSERT INTO root.aligned.having.d1(time, s2) VALUES(1, 1), (2,2), 
(5,5), (7,7), (8,8), (9,9);",
+        "INSERT INTO root.aligned.having.d2(time, s1) VALUES(1, 1), (2,2), 
(5,5), (7,7), (8,8), (9,9);",
+        "INSERT INTO root.aligned.having.d2(time, s2) VALUES(0, 0), (4,4), 
(5,5), (8,8);",
+        // test aligned
+        "CREATE DATABASE root.aligned.aligned;",
+        "CREATE ALIGNED TIMESERIES root.aligned.aligned.d1.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.aligned.d1.s2 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.aligned.d2.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "CREATE ALIGNED TIMESERIES root.aligned.aligned.d2.s2 WITH 
DATATYPE=INT32, ENCODING=PLAIN;",
+        "INSERT INTO root.aligned.aligned.d1(time, s1) ALIGNED VALUES(0, 0), 
(4,4), (5,5), (8,8);",
+        "INSERT INTO root.aligned.aligned.d1(time, s2) ALIGNED VALUES(1, 1), 
(2,2), (5,5), (7,7), (8,8), (9,9);",
+        "INSERT INTO root.aligned.aligned.d2(time, s1) ALIGNED VALUES(1, 1), 
(2,2), (5,5), (7,7), (8,8);",
+        "INSERT INTO root.aligned.aligned.d2(time, s2) ALIGNED VALUES(0, 0), 
(4,4), (5,5), (8,8);",
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    // TODO set
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(SQL_LIST);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void normalQueryTest() {
+    // align by time
+    String[] expectedHeader = new String[] {"count_time(*)"};
+    String[] retArray = new String[] {"2,"};
+    resultSetEqualTest("SELECT COUNT_TIME(*) FROM root.aligned.db.**;", 
expectedHeader, retArray);
+
+    expectedHeader = new String[] {"count_time(*)"};
+    retArray = new String[] {"2,"};
+    resultSetEqualTest(
+        "SELECT COUNT_TIME(*) FROM root.aligned.db.d1, root.aligned.db.d2;",
+        expectedHeader,
+        retArray);
+
+    // align by device
+    expectedHeader = new String[] {"Device,count_time(*)"};
+    retArray = new String[] {"root.aligned.db.d1,2,", "root.aligned.db.d2,1,"};
+    resultSetEqualTest(
+        "select count_time(*) from root.aligned.db.** align by device;", 
expectedHeader, retArray);
+
+    expectedHeader = new String[] {"Device,count_time(*)"};
+    retArray = new String[] {"root.aligned.db.d1,2,", "root.aligned.db.d2,1,"};
+    resultSetEqualTest(
+        "select count_time(*) from root.aligned.db.d1,root.aligned.db.d2 align 
by device;",
+        expectedHeader,
+        retArray);
+  }
+
+  @Test
+  public void groupByTimeTest() {
+    // align by time
+    String[] expectedHeader = new String[] {"Time,count_time(*)"};
+    String[] retArray = new String[] {"0,2,", "2,1,", "4,2,", "6,1,", "8,2,"};
+    resultSetEqualTest(
+        "SELECT count_time(*) FROM root.aligned.downsampling.** GROUP BY([0, 
10), 2ms);",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "SELECT count_time(*) FROM 
root.aligned.downsampling.d1,root.aligned.downsampling.d2 GROUP BY([0, 10), 
2ms);",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "SELECT count_time(*) FROM 
root.aligned.downsampling.d2,root.aligned.downsampling.d1 GROUP BY([0, 10), 
2ms);",
+        expectedHeader,
+        retArray);
+
+    expectedHeader = new String[] {"Time,count_time(*)"};
+    retArray = new String[] {"0,2,", "2,1,", "4,2,", "6,1,", "8,2,"};
+    resultSetEqualTest(
+        "SELECT count_time(*) FROM root.aligned.downsampling.d1 GROUP BY([0, 
10), 2ms);",
+        expectedHeader,
+        retArray);
+
+    // align by device
+    expectedHeader = new String[] {"Time,Device,count_time(*)"};
+    retArray =
+        new String[] {
+          "0,root.aligned.downsampling.d1,2,",
+          "2,root.aligned.downsampling.d1,1,",
+          "4,root.aligned.downsampling.d1,2,",
+          "6,root.aligned.downsampling.d1,1,",
+          "8,root.aligned.downsampling.d1,2,",
+          "0,root.aligned.downsampling.d2,2,",
+          "2,root.aligned.downsampling.d2,1,",
+          "4,root.aligned.downsampling.d2,2,",
+          "6,root.aligned.downsampling.d2,1,",
+          "8,root.aligned.downsampling.d2,1,",
+        };
+    resultSetEqualTest(
+        "SELECT count_time(*) FROM root.aligned.downsampling.** GROUP BY([0, 
10), 2ms) ALIGN BY DEVICE;",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "SELECT count_time(*) FROM 
root.aligned.downsampling.d1,root.aligned.downsampling.d2 GROUP BY([0, 10), 
2ms) ALIGN BY DEVICE;",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "SELECT count_time(*) FROM 
root.aligned.downsampling.d2,root.aligned.downsampling.d1 GROUP BY([0, 10), 
2ms) ALIGN BY DEVICE;",
+        expectedHeader,
+        retArray);
+
+    // test sort
+    expectedHeader = new String[] {"Time,Device,count_time(*)"};
+    retArray =
+        new String[] {
+          "2,root.aligned.downsampling.d1,1,",
+          "6,root.aligned.downsampling.d1,1,",
+          "2,root.aligned.downsampling.d2,1,",
+          "6,root.aligned.downsampling.d2,1,",
+          "8,root.aligned.downsampling.d2,1,",
+          "0,root.aligned.downsampling.d1,2,",
+          "4,root.aligned.downsampling.d1,2,",
+          "8,root.aligned.downsampling.d1,2,",
+          "0,root.aligned.downsampling.d2,2,",
+          "4,root.aligned.downsampling.d2,2,",
+        };
+    resultSetEqualTest(
+        "SELECT count_time(*) FROM root.aligned.downsampling.** GROUP BY([0, 
10), 2ms) ORDER BY count_time(*) ALIGN BY DEVICE;",
+        expectedHeader,
+        retArray);
+  }
+
+  @Test
+  public void groupByVariationTest() {
+    // align by time
+    String[] expectedHeader = new String[] {"Time,__endTime,count_time(*)"};
+    String[] retArray = new String[] {"0,1,2,", "2,2,1,", "3,4,2,", "5,6,2,"};
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from root.aligned.variation.d1 group 
by variation(state, 0, ignoreNull=False);",
+        expectedHeader,
+        retArray);
+
+    // align by device
+    expectedHeader = new String[] {"Time,Device,__endTime,count_time(*)"};
+    retArray =
+        new String[] {
+          "0,root.aligned.variation.d1,1,2,",
+          "2,root.aligned.variation.d1,2,1,",
+          "3,root.aligned.variation.d1,4,2,",
+          "5,root.aligned.variation.d1,6,2,",
+          "0,root.aligned.variation.d2,0,1,",
+          "1,root.aligned.variation.d2,1,1,",
+          "2,root.aligned.variation.d2,6,4,",
+        };
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from root.aligned.variation.** "
+            + "group by variation(state, 0, ignoreNull=False) align by 
device;",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from 
root.aligned.variation.d1,root.aligned.variation.d2 "
+            + "group by variation(state, 0, ignoreNull=False) align by 
device;",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from 
root.aligned.variation.d2,root.aligned.variation.d1 "
+            + "group by variation(state, 0, ignoreNull=False) align by 
device;",
+        expectedHeader,
+        retArray);
+  }
+
+  @Test
+  public void groupBySessionTest() {
+    // align by time
+    String[] expectedHeader = new String[] {"Time,__endTime,count_time(*)"};
+    String[] retArray = new String[] {"0,1,2,", "20,23,2,", "40,40,1,", 
"55,56,2,"};
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from root.aligned.session.** group by 
session(10ms);",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from 
root.aligned.session.d1,root.aligned.session.d2 group by session(10ms);",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from 
root.aligned.session.d2,root.aligned.session.d1 group by session(10ms);",
+        expectedHeader,
+        retArray);
+
+    // align by device
+    expectedHeader = new String[] {"Time,Device,__endTime,count_time(*)"};
+    retArray =
+        new String[] {
+          "0,root.aligned.session.d1,1,2,",
+          "20,root.aligned.session.d1,23,2,",
+          "40,root.aligned.session.d1,40,1,",
+          "55,root.aligned.session.d1,56,2,",
+          "0,root.aligned.session.d2,1,2,",
+          "20,root.aligned.session.d2,23,2,",
+          "40,root.aligned.session.d2,40,1,",
+          "56,root.aligned.session.d2,56,1,",
+        };
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from root.aligned.session.** group by 
session(10ms) align by device;",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from 
root.aligned.session.d1,root.aligned.session.d2 group by session(10ms) align by 
device;",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "select __endTime, count_time(*) from 
root.aligned.session.d2,root.aligned.session.d1 group by session(10ms) align by 
device;",
+        expectedHeader,
+        retArray);
+  }
+
+  @Test
+  public void groupByConditionTest() {
+    // align by time
+    String[] expectedHeader = new String[] {"Time,count_time(*)"};
+    String[] retArray = new String[] {"55,2,"};
+    resultSetEqualTest(
+        "select count_time(*) from root.aligned.condition.d1 group by 
condition(state=1, KEEP>=2, ignoreNull=false);",
+        expectedHeader,
+        retArray);
+
+    // align by device
+    expectedHeader = new String[] {"Time,Device,count_time(*)"};
+    retArray = new String[] {"55,root.aligned.condition.d1,2,", 
"20,root.aligned.condition.d2,4,"};
+    resultSetEqualTest(
+        "select count_time(*) from root.aligned.condition.** group by 
condition(state=1, KEEP>=2, ignoreNull=false) align by device;",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "select count_time(*) from 
root.aligned.condition.d1,root.aligned.condition.d2 group by condition(state=1, 
KEEP>=2, ignoreNull=false) align by device;",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "select count_time(*) from 
root.aligned.condition.d2,root.aligned.condition.d1 group by condition(state=1, 
KEEP>=2, ignoreNull=false) align by device;",
+        expectedHeader,
+        retArray);
+  }
+
+  @Test
+  public void testUnSupportedSql() {
+    assertTestFail(
+        "SELECT COUNT_TIME(s1) FROM root.aligned.db.**;",
+        TSStatusCode.SEMANTIC_ERROR.getStatusCode() + ": " + 
COUNT_TIME_ONLY_SUPPORT_ONE_WILDCARD);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 31684bf0915..54f2ce71dbc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -97,6 +98,8 @@ import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 import com.google.common.base.Function;
 import org.apache.commons.lang3.Validate;
@@ -118,6 +121,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
 
 public class LogicalPlanBuilder {
 
@@ -324,6 +328,7 @@ public class LogicalPlanBuilder {
     boolean needCheckAscending = groupByTimeParameter == null;
     Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new 
HashMap<>();
     Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new 
HashMap<>();
+    Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new 
HashMap<>();
     for (Expression aggregationExpression : aggregationExpressions) {
       createAggregationDescriptor(
           (FunctionExpression) aggregationExpression,
@@ -331,13 +336,15 @@ public class LogicalPlanBuilder {
           scanOrder,
           needCheckAscending,
           ascendingAggregations,
-          descendingAggregations);
+          descendingAggregations,
+          countTimeAggregations);
     }
 
     List<PlanNode> sourceNodeList =
         constructSourceNodeFromAggregationDescriptors(
             ascendingAggregations,
             descendingAggregations,
+            countTimeAggregations,
             scanOrder,
             timeFilter,
             groupByTimeParameter);
@@ -372,6 +379,7 @@ public class LogicalPlanBuilder {
     Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new 
HashMap<>();
     Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new 
HashMap<>();
     Map<AggregationDescriptor, Integer> aggregationToIndexMap = new 
HashMap<>();
+    Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations = new 
HashMap<>();
 
     int index = 0;
     for (Expression aggregationExpression : aggregationExpressions) {
@@ -382,7 +390,8 @@ public class LogicalPlanBuilder {
               scanOrder,
               needCheckAscending,
               ascendingAggregations,
-              descendingAggregations);
+              descendingAggregations,
+              countTimeAggregations);
       aggregationToIndexMap.put(aggregationDescriptor, 
deviceViewInputIndexes.get(index));
       index++;
     }
@@ -391,6 +400,7 @@ public class LogicalPlanBuilder {
         constructSourceNodeFromAggregationDescriptors(
             ascendingAggregations,
             descendingAggregations,
+            countTimeAggregations,
             scanOrder,
             timeFilter,
             groupByTimeParameter);
@@ -427,7 +437,8 @@ public class LogicalPlanBuilder {
       Ordering scanOrder,
       boolean needCheckAscending,
       Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
-      Map<PartialPath, List<AggregationDescriptor>> descendingAggregations) {
+      Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
+      Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations) {
     AggregationDescriptor aggregationDescriptor =
         new AggregationDescriptor(
             sourceExpression.getFunctionName(),
@@ -437,6 +448,38 @@ public class LogicalPlanBuilder {
     if (curStep.isOutputPartial()) {
       updateTypeProviderByPartialAggregation(aggregationDescriptor, 
context.getTypeProvider());
     }
+
+    if (COUNT_TIME.equalsIgnoreCase(sourceExpression.getFunctionName())) {
+      Map<String, Pair<List<String>, List<IMeasurementSchema>>> map = new 
HashMap<>();
+      for (Expression expression : sourceExpression.getCountTimeExpressions()) 
{
+        TimeSeriesOperand ts = (TimeSeriesOperand) expression;
+        PartialPath path = ts.getPath();
+        Pair<List<String>, List<IMeasurementSchema>> pair =
+            map.computeIfAbsent(
+                path.getDevice(), k -> new Pair<>(new ArrayList<>(), new 
ArrayList<>()));
+        pair.left.add(path.getMeasurement());
+        try {
+          pair.right.add(path.getMeasurementSchema());
+        } catch (MetadataException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+
+      for (Map.Entry<String, Pair<List<String>, List<IMeasurementSchema>>> 
entry : map.entrySet()) {
+        String device = entry.getKey();
+        Pair<List<String>, List<IMeasurementSchema>> pair = entry.getValue();
+        AlignedPath alignedPath = null;
+        try {
+          alignedPath = new AlignedPath(device, pair.left, pair.right);
+        } catch (IllegalPathException e) {
+          throw new RuntimeException(e);
+        }
+        countTimeAggregations.put(alignedPath, 
Collections.singletonList(aggregationDescriptor));
+      }
+
+      return aggregationDescriptor;
+    }
+
     PartialPath selectPath =
         ((TimeSeriesOperand) 
sourceExpression.getExpressions().get(0)).getPath();
     if (!needCheckAscending
@@ -456,13 +499,20 @@ public class LogicalPlanBuilder {
   private List<PlanNode> constructSourceNodeFromAggregationDescriptors(
       Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
       Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
+      Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations,
       Ordering scanOrder,
       Filter timeFilter,
       GroupByTimeParameter groupByTimeParameter) {
+
     List<PlanNode> sourceNodeList = new ArrayList<>();
     boolean needCheckAscending = groupByTimeParameter == null;
-    Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations 
=
-        MetaUtils.groupAlignedAggregations(ascendingAggregations);
+    Map<PartialPath, List<AggregationDescriptor>> groupedAscendingAggregations 
= null;
+    if (!countTimeAggregations.isEmpty()) {
+      groupedAscendingAggregations = countTimeAggregations;
+    } else {
+      groupedAscendingAggregations = 
MetaUtils.groupAlignedAggregations(ascendingAggregations);
+    }
+
     for (Map.Entry<PartialPath, List<AggregationDescriptor>> 
pathAggregationsEntry :
         groupedAscendingAggregations.entrySet()) {
       sourceNodeList.add(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 86b98eda96a..58396e11ac5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.queryengine.plan.planner;
 
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
 import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
@@ -25,6 +27,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -83,6 +86,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -91,6 +96,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
+
 /**
  * This visitor is used to generate a logical plan for the statement and 
returns the {@link
  * PlanNode}.
@@ -248,7 +255,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
           analysis.hasValueFilter()
               || analysis.hasGroupByParameter()
               || needTransform(sourceTransformExpressions)
-              || cannotUseStatistics(aggregationExpressions);
+              || cannotUseStatistics(aggregationExpressions, 
sourceTransformExpressions);
       AggregationStep curStep;
       if (isRawDataSource) {
         planBuilder =
@@ -344,11 +351,31 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
     return false;
   }
 
-  private boolean cannotUseStatistics(Set<Expression> expressions) {
+  private boolean cannotUseStatistics(
+      Set<Expression> expressions, Set<Expression> sourceTransformExpressions) 
{
     for (Expression expression : expressions) {
+
       if (expression instanceof FunctionExpression) {
-        if (!BuiltinAggregationFunction.canUseStatistics(
-            ((FunctionExpression) expression).getFunctionName())) {
+        FunctionExpression functionExpression = (FunctionExpression) 
expression;
+        if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) 
{
+          String alignedDeviceId = "";
+          for (Expression countTimeExpression : sourceTransformExpressions) {
+            TimeSeriesOperand ts = (TimeSeriesOperand) countTimeExpression;
+            if (!(ts.getPath() instanceof AlignedPath
+                || ((MeasurementPath) ts.getPath()).isUnderAlignedEntity())) {
+              return true;
+            }
+            if (StringUtils.isEmpty(alignedDeviceId)) {
+              alignedDeviceId = ts.getPath().getDevice();
+            } else if 
(!alignedDeviceId.equalsIgnoreCase(ts.getPath().getDevice())) {
+              // count_time from only one aligned device can use 
AlignedSeriesAggScan
+              return true;
+            }
+          }
+          return false;
+        }
+
+        if 
(!BuiltinAggregationFunction.canUseStatistics(functionExpression.getFunctionName()))
 {
           return true;
         }
       } else {

Reply via email to