This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/multi_devices_fe
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/multi_devices_fe by 
this push:
     new 00415c95717 add template
00415c95717 is described below

commit 00415c95717042261ba7ac41b70cc42966c8b037
Author: Beyyes <[email protected]>
AuthorDate: Thu Oct 26 23:49:24 2023 +0800

    add template
---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  62 ++++++--
 .../plan/analyze/TemplatedDeviceAnalyze.java       | 164 +++++++++++++++++++++
 .../queryengine/plan/execution/QueryExecution.java |   9 +-
 3 files changed, 217 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index fc8f341d8d0..d40cd576319 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -211,10 +211,10 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
-  private static final Expression DEVICE_EXPRESSION =
+  protected static final Expression DEVICE_EXPRESSION =
       TimeSeriesOperand.constructColumnHeaderExpression(DEVICE, 
TSDataType.TEXT);
 
-  private static final Expression END_TIME_EXPRESSION =
+  protected static final Expression END_TIME_EXPRESSION =
       TimeSeriesOperand.constructColumnHeaderExpression(ENDTIME, 
TSDataType.INT64);
 
   private final List<String> lastQueryColumnNames =
@@ -251,6 +251,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
   public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext 
context) {
     Analysis analysis = new Analysis();
     analysis.setLastLevelUseWildcard(queryStatement.isLastLevelUseWildcard());
+
+    long startTime = System.currentTimeMillis();
     try {
       // check for semantic errors
       queryStatement.semanticCheck();
@@ -260,6 +262,10 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       }
 
       ISchemaTree schemaTree = analyzeSchema(queryStatement, analysis, 
context);
+
+      logger.warn("----- Analyze analyzeSchema cost: {}ms", 
System.currentTimeMillis() - startTime);
+      startTime = System.currentTimeMillis();
+
       // If there is no leaf node in the schema tree, the query should be 
completed immediately
       if (schemaTree.isEmpty()) {
         return finishQuery(queryStatement, analysis);
@@ -279,6 +285,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         // fe 线程管理
         //
         List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
+        logger.warn("----- Analyze analyzeFrom cost: {}ms", 
System.currentTimeMillis() - startTime);
+        startTime = System.currentTimeMillis();
 
         if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
           // remove the device which won't appear in resultSet after 
limit/offset
@@ -286,7 +294,18 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         }
 
         analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
-        outputExpressions = analyzeSelect(analysis, queryStatement, 
schemaTree, deviceList);
+        logger.warn(
+            "----- Analyze analyzeDeviceToWhere cost: {}ms",
+            System.currentTimeMillis() - startTime);
+        startTime = System.currentTimeMillis();
+
+        outputExpressions =
+            TemplatedDeviceAnalyze.analyzeSelect(analysis, queryStatement, 
schemaTree, deviceList);
+
+        logger.warn(
+            "----- Analyze analyzeSelect cost: {}ms", 
System.currentTimeMillis() - startTime);
+        startTime = System.currentTimeMillis();
+
         if (deviceList.isEmpty()) {
           return finishQuery(queryStatement, analysis);
         }
@@ -299,10 +318,18 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         analyzeDeviceToAggregation(analysis, queryStatement);
         analyzeDeviceToSourceTransform(analysis, queryStatement);
         analyzeDeviceToSource(analysis, queryStatement);
+        logger.warn(
+            "----- Analyze analyzeDeviceToSource cost: {}ms",
+            System.currentTimeMillis() - startTime);
+        startTime = System.currentTimeMillis();
 
         analyzeDeviceViewOutput(analysis, queryStatement);
         analyzeDeviceViewInput(analysis, queryStatement);
 
+        logger.warn(
+            "----- Analyze analyzeDeviceView cost: {}ms", 
System.currentTimeMillis() - startTime);
+        startTime = System.currentTimeMillis();
+
         analyzeInto(analysis, queryStatement, deviceList, outputExpressions);
       } else {
         Map<Integer, List<Pair<Expression, String>>> outputExpressionMap =
@@ -352,7 +379,10 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
       // fetch partition information
       analyzeDataPartition(analysis, queryStatement, schemaTree);
-
+      logger.warn(
+          "----- Analyze analyzeOutput+analyzeDataPartition cost: {}ms",
+          System.currentTimeMillis() - startTime);
+      startTime = System.currentTimeMillis();
     } catch (StatementAnalyzeException e) {
       throw new StatementAnalyzeException(
           "Meet error when analyzing the query statement: " + e.getMessage());
@@ -633,12 +663,14 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     Set<PartialPath> deviceSet = new HashSet<>();
     for (PartialPath devicePattern : devicePatternList) {
       // get all matched devices
+      // TODO isPrefixMatch可否设置为false? analyzeFrom能否直接返回schemaTree的全部devices?
       deviceSet.addAll(
-          schemaTree.getMatchedDevices(devicePattern).stream()
+          schemaTree.getMatchedDevices(devicePattern, false).stream()
               .map(DeviceSchemaInfo::getDevicePath)
               .collect(Collectors.toList()));
     }
 
+    // TODO 是否一定要排序?  最终的sourceNodeList已经会排序?
     return queryStatement.getResultDeviceOrder() == Ordering.ASC
         ? deviceSet.stream().sorted().collect(Collectors.toList())
         : 
deviceSet.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
@@ -649,7 +681,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       Analysis analysis,
       QueryStatement queryStatement,
       ISchemaTree schemaTree,
-      List<PartialPath> deviceSet) {
+      List<PartialPath> deviceList) {
     List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
     Map<String, Set<Expression>> deviceToSelectExpressions = new HashMap<>();
     ColumnPaginationController paginationController =
@@ -663,7 +695,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       // use LinkedHashMap for order-preserving
       Map<Expression, Map<String, Expression>> 
measurementToDeviceSelectExpressions =
           new LinkedHashMap<>();
-      for (PartialPath device : deviceSet) {
+      for (PartialPath device : deviceList) {
         List<Expression> selectExpressionsOfOneDevice =
             concatDeviceAndBindSchemaForExpression(selectExpression, device, 
schemaTree);
         if (selectExpressionsOfOneDevice.isEmpty()) {
@@ -718,12 +750,12 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
     // remove devices without measurements to compute
     Set<PartialPath> noMeasurementDevices = new HashSet<>();
-    for (PartialPath device : deviceSet) {
+    for (PartialPath device : deviceList) {
       if (!deviceToSelectExpressions.containsKey(device.getFullPath())) {
         noMeasurementDevices.add(device);
       }
     }
-    deviceSet.removeAll(noMeasurementDevices);
+    deviceList.removeAll(noMeasurementDevices);
 
     // when the select expression of any device is empty,
     // the where expression map also need remove this device
@@ -745,7 +777,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     return outputExpressions;
   }
 
-  private void updateMeasurementToDeviceSelectExpressions(
+  protected static void updateMeasurementToDeviceSelectExpressions(
       Analysis analysis,
       Map<Expression, Map<String, Expression>> 
measurementToDeviceSelectExpressions,
       PartialPath device,
@@ -759,7 +791,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     }
   }
 
-  private void updateDeviceToSelectExpressions(
+  protected static void updateDeviceToSelectExpressions(
       Analysis analysis,
       Map<String, Set<Expression>> deviceToSelectExpressions,
       Map<String, Expression> deviceToSelectExpressionsOfOneMeasurement) {
@@ -777,7 +809,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     }
   }
 
-  private String analyzeAlias(
+  protected static String analyzeAlias(
       String resultColumnAlias,
       Expression rawExpression,
       Expression normalizedExpression,
@@ -1623,7 +1655,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     queryStatement.updateSortItems(orderByExpressions);
   }
 
-  private TSDataType analyzeExpressionType(Analysis analysis, Expression 
expression) {
+  private static TSDataType analyzeExpressionType(Analysis analysis, 
Expression expression) {
     return analyzeExpression(analysis, expression);
   }
 
@@ -2162,7 +2194,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
    * <p>an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align 
by device, return
    * false while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
    */
-  private void checkDataTypeConsistencyInAlignByDevice(
+  protected static void checkDataTypeConsistencyInAlignByDevice(
       Analysis analysis, List<Expression> expressions) {
     TSDataType checkedDataType = analysis.getType(expressions.get(0));
     for (Expression expression : expressions) {
@@ -2183,7 +2215,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     }
   }
 
-  private void checkAliasUniqueness(
+  protected static void checkAliasUniqueness(
       String alias, Map<Expression, Map<String, Expression>> 
measurementToDeviceSelectExpressions) {
     if (alias != null && measurementToDeviceSelectExpressions.keySet().size() 
> 1) {
       throw new SemanticException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
new file mode 100644
index 00000000000..888b65af453
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
+import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeAlias;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.checkAliasUniqueness;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.updateDeviceToSelectExpressions;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.updateMeasurementToDeviceSelectExpressions;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.toLowerCaseExpression;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
+
+/**
+ * This class provides accelerated implementation for multiple devices align 
by device query. This
+ * optimization is only used for devices with same template, using template 
can avoid many
+ * unnecessary judgements.
+ *
+ * <p>e.g. for query `SELECT * FROM root.xx.** order by device/time/expression 
align by device`, the
+ * device list of `root.xx.**` must use same template.
+ */
+public class TemplatedDeviceAnalyze {
+
+  protected static List<Pair<Expression, String>> analyzeSelect(
+      Analysis analysis,
+      QueryStatement queryStatement,
+      ISchemaTree schemaTree,
+      List<PartialPath> deviceList) {
+
+    Template template = 
ClusterTemplateManager.getInstance().getAllTemplates().get(0);
+
+    List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
+    Map<String, Set<Expression>> deviceToSelectExpressions = new HashMap<>();
+    ColumnPaginationController paginationController =
+        new ColumnPaginationController(
+            queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset(), 
false);
+
+    for (ResultColumn resultColumn : 
queryStatement.getSelectComponent().getResultColumns()) {
+      Expression selectExpression = resultColumn.getExpression();
+
+      // select expression after removing wildcard, LinkedHashMap for 
order-preserving
+      Map<Expression, Map<String, Expression>> 
measurementToDeviceSelectExpressions =
+          new LinkedHashMap<>();
+      for (PartialPath device : deviceList) {
+        List<Expression> selectExpressionsOfOneDevice =
+            concatDeviceAndBindSchemaForExpression(selectExpression, device, 
schemaTree);
+        if (selectExpressionsOfOneDevice.isEmpty()) {
+          continue;
+        }
+
+        updateMeasurementToDeviceSelectExpressions(
+            analysis, measurementToDeviceSelectExpressions, device, 
selectExpressionsOfOneDevice);
+      }
+
+      checkAliasUniqueness(resultColumn.getAlias(), 
measurementToDeviceSelectExpressions);
+
+      for (Map.Entry<Expression, Map<String, Expression>> entry :
+          measurementToDeviceSelectExpressions.entrySet()) {
+        Expression measurementExpression = entry.getKey();
+        Map<String, Expression> deviceToSelectExpressionsOfOneMeasurement = 
entry.getValue();
+
+        if (paginationController.hasCurOffset()) {
+          paginationController.consumeOffset();
+        } else if (paginationController.hasCurLimit()) {
+          deviceToSelectExpressionsOfOneMeasurement
+              .values()
+              .forEach(expression -> analyzeExpression(analysis, expression));
+
+          // fix: devices used same template must have consistent type, no 
need to
+          // checkDataTypeConsistency
+
+          Expression lowerCaseMeasurementExpression = 
toLowerCaseExpression(measurementExpression);
+          analyzeExpression(analysis, lowerCaseMeasurementExpression);
+
+          outputExpressions.add(
+              new Pair<>(
+                  lowerCaseMeasurementExpression,
+                  analyzeAlias(
+                      resultColumn.getAlias(),
+                      measurementExpression,
+                      lowerCaseMeasurementExpression,
+                      queryStatement)));
+
+          updateDeviceToSelectExpressions(
+              analysis, deviceToSelectExpressions, 
deviceToSelectExpressionsOfOneMeasurement);
+
+          paginationController.consumeLimit();
+        } else {
+          break;
+        }
+      }
+    }
+
+    removeDevicesWithoutMeasurements(deviceList, deviceToSelectExpressions, 
analysis);
+
+    Set<Expression> selectExpressions = new LinkedHashSet<>();
+    selectExpressions.add(DEVICE_EXPRESSION);
+    if (queryStatement.isOutputEndTime()) {
+      selectExpressions.add(END_TIME_EXPRESSION);
+    }
+    outputExpressions.forEach(pair -> selectExpressions.add(pair.getLeft()));
+    analysis.setSelectExpressions(selectExpressions);
+    analysis.setDeviceToSelectExpressions(deviceToSelectExpressions);
+
+    return outputExpressions;
+  }
+
+  private static void removeDevicesWithoutMeasurements(
+      List<PartialPath> deviceList,
+      Map<String, Set<Expression>> deviceToSelectExpressions,
+      Analysis analysis) {
+    // remove devices without measurements to compute
+    Set<PartialPath> noMeasurementDevices = new HashSet<>();
+    for (PartialPath device : deviceList) {
+      if (!deviceToSelectExpressions.containsKey(device.getFullPath())) {
+        noMeasurementDevices.add(device);
+      }
+    }
+    deviceList.removeAll(noMeasurementDevices);
+
+    // when the select expression of any device is empty,
+    // the where expression map also need remove this device
+    if (analysis.getDeviceToWhereExpression() != null) {
+      noMeasurementDevices.forEach(
+          devicePath -> 
analysis.getDeviceToWhereExpression().remove(devicePath.getFullPath()));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 1fd4652a0bb..e9c68fd6464 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -217,10 +217,16 @@ public class QueryExecution implements IQueryExecution {
       return;
     }
 
+    long sTime = System.currentTimeMillis();
     // check timeout for query first
     checkTimeOutForQuery();
     doLogicalPlan();
+    logger.warn("----- doLogicalPlan cost: {}ms", System.currentTimeMillis() - 
sTime);
+    sTime = System.currentTimeMillis();
+
     doDistributedPlan();
+    logger.warn("----- doDistributedPlan cost: {}ms", 
System.currentTimeMillis() - sTime);
+
     // update timeout after finishing plan stage
     context.setTimeOut(
         context.getTimeOut() - (System.currentTimeMillis() - 
context.getStartTime()));
@@ -239,9 +245,6 @@ public class QueryExecution implements IQueryExecution {
     if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
       stateMachine.transitionToFailed(analysis.getFailStatus());
     }
-    logger.warn(
-        "~~~~ Consume time in doLogicalPlan+doDistributionPlan: {}ns",
-        System.nanoTime() - startTime);
   }
 
   private void checkTimeOutForQuery() {

Reply via email to