This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 790e57ff71 [IOTDB-3283] Implement Analyzer & LogicalPlanner for last 
query (#6017)
790e57ff71 is described below

commit 790e57ff7171aea20066033caaf6cda6074c6c7d
Author: liuminghui233 <[email protected]>
AuthorDate: Thu May 26 11:44:59 2022 +0800

    [IOTDB-3283] Implement Analyzer & LogicalPlanner for last query (#6017)
---
 .../iotdb/db/mpp/common/header/HeaderConstant.java | 19 ++++++-
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 59 +++++++++++++++++-----
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 23 +++++++++
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |  6 +++
 .../plan/node/process/LastQueryMergeNode.java      |  4 +-
 .../plan/node/source/AlignedLastQueryScanNode.java |  4 +-
 .../plan/node/source/LastQueryScanNode.java        |  6 +--
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     | 43 ++++++++++++++++
 8 files changed, 143 insertions(+), 21 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
index 875bb50428..e9951b13ca 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
@@ -27,6 +27,8 @@ import java.util.Collections;
 public class HeaderConstant {
 
   // column names for query statement
+  public static final String COLUMN_TIME = "Time";
+  public static final String COLUMN_VALUE = "value";
   public static final String COLUMN_DEVICE = "Device";
 
   // column names for schema statement
@@ -71,6 +73,9 @@ public class HeaderConstant {
   public static final DatasetHeader countTimeSeriesHeader;
   public static final DatasetHeader countLevelTimeSeriesHeader;
 
+  // dataset header for last query
+  public static final DatasetHeader LAST_QUERY_HEADER;
+
   static {
     countStorageGroupHeader =
         new DatasetHeader(
@@ -140,9 +145,19 @@ public class HeaderConstant {
             true);
     showChildPathsHeader =
         new DatasetHeader(
-            Arrays.asList(new ColumnHeader(COLUMN_CHILDPATHS, 
TSDataType.TEXT)), true);
+            Collections.singletonList(new ColumnHeader(COLUMN_CHILDPATHS, 
TSDataType.TEXT)), true);
     showChildNodesHeader =
         new DatasetHeader(
-            Arrays.asList(new ColumnHeader(COLUMN_CHILDNODES, 
TSDataType.TEXT)), true);
+            Collections.singletonList(new ColumnHeader(COLUMN_CHILDNODES, 
TSDataType.TEXT)), true);
+  }
+
+  static {
+    LAST_QUERY_HEADER =
+        new DatasetHeader(
+            Arrays.asList(
+                new ColumnHeader(COLUMN_TIMESERIES, TSDataType.TEXT),
+                new ColumnHeader(COLUMN_VALUE, TSDataType.TEXT),
+                new ColumnHeader(COLUMN_TIMESERIES_DATATYPE, TSDataType.TEXT)),
+            false);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index f1176643a9..dc168a55a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -37,6 +38,7 @@ import 
org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FilterNullParameter;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -139,9 +141,9 @@ public class Analyzer {
 
         // concat path and construct path pattern tree
         PathPatternTree patternTree = new PathPatternTree();
-        QueryStatement rewrittenStatement =
+        queryStatement =
             (QueryStatement) new ConcatPathRewriter().rewrite(queryStatement, 
patternTree);
-        analysis.setStatement(rewrittenStatement);
+        analysis.setStatement(queryStatement);
 
         // request schema fetch API
         logger.info("{} fetch query schema...", getLogHeader());
@@ -160,6 +162,34 @@ public class Analyzer {
         analysis.setGlobalTimeFilter(globalTimeFilter);
         analysis.setHasValueFilter(hasValueFilter);
 
+        if (queryStatement.isLastQuery()) {
+          if (hasValueFilter) {
+            throw new SemanticException("Only time filters are supported in 
LAST query");
+          }
+
+          List<MeasurementPath> allSelectedPath = 
schemaTree.getAllMeasurement();
+          Set<Expression> sourceExpressions =
+              allSelectedPath.stream()
+                  .map(TimeSeriesOperand::new)
+                  .collect(Collectors.toCollection(LinkedHashSet::new));
+          sourceExpressions.forEach(
+              expression -> ExpressionAnalyzer.updateTypeProvider(expression, 
typeProvider));
+          analysis.setSourceExpressions(sourceExpressions);
+
+          analysis.setRespDatasetHeader(HeaderConstant.LAST_QUERY_HEADER);
+          typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES, 
TSDataType.TEXT);
+          typeProvider.setType(HeaderConstant.COLUMN_VALUE, TSDataType.TEXT);
+          typeProvider.setType(HeaderConstant.COLUMN_TIMESERIES_DATATYPE, 
TSDataType.TEXT);
+
+          Set<String> deviceSet =
+              sourceExpressions.stream()
+                  .map(ExpressionAnalyzer::getDeviceNameInSourceExpression)
+                  .collect(Collectors.toSet());
+          DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet, 
schemaTree);
+          analysis.setDataPartitionInfo(dataPartition);
+          return analysis;
+        }
+
         // Example 1: select s1, s1 + s2 as t, udf(udf(s1)) from root.sg.d1
         //   outputExpressions: [<root.sg.d1.s1,null>, <root.sg.d1.s1 + 
root.sg.d1.s2,t>,
         //                       <udf(udf(root.sg.d1.s1)),null>]
@@ -390,16 +420,7 @@ public class Analyzer {
             
deviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression));
           }
         }
-        Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = 
new HashMap<>();
-        for (String devicePath : deviceSet) {
-          DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
-          queryParam.setDevicePath(devicePath);
-          sgNameToQueryParamsMap
-              .computeIfAbsent(
-                  schemaTree.getBelongedStorageGroup(devicePath), key -> new 
ArrayList<>())
-              .add(queryParam);
-        }
-        DataPartition dataPartition = 
partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+        DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet, 
schemaTree);
         analysis.setDataPartitionInfo(dataPartition);
       } catch (StatementAnalyzeException e) {
         logger.error("Meet error when analyzing the query statement: ", e);
@@ -772,6 +793,20 @@ public class Analyzer {
       return new DatasetHeader(columnHeaders, isIgnoreTimestamp);
     }
 
+    private DataPartition fetchDataPartitionByDevices(
+        Set<String> deviceSet, SchemaTree schemaTree) {
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new 
HashMap<>();
+      for (String devicePath : deviceSet) {
+        DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
+        queryParam.setDevicePath(devicePath);
+        sgNameToQueryParamsMap
+            .computeIfAbsent(
+                schemaTree.getBelongedStorageGroup(devicePath), key -> new 
ArrayList<>())
+            .add(queryParam);
+      }
+      return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+    }
+
     /**
      * Check datatype consistency in ALIGN BY DEVICE.
      *
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 68b0a39237..d60d210118 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -54,12 +54,15 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
@@ -137,6 +140,26 @@ public class LogicalPlanBuilder {
     return this;
   }
 
+  public LogicalPlanBuilder planLast(Set<Expression> sourceExpressions, Filter 
globalTimeFilter) {
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+    for (Expression sourceExpression : sourceExpressions) {
+      MeasurementPath selectPath =
+          (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath();
+      if (selectPath.isUnderAlignedEntity()) {
+        sourceNodeList.add(
+            new AlignedLastQueryScanNode(
+                context.getQueryId().genPlanNodeId(), new 
AlignedPath(selectPath)));
+      } else {
+        sourceNodeList.add(new 
LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath));
+      }
+    }
+
+    this.root =
+        new LastQueryMergeNode(
+            context.getQueryId().genPlanNodeId(), sourceNodeList, 
globalTimeFilter);
+    return this;
+  }
+
   public LogicalPlanBuilder planAggregationSource(
       Set<Expression> sourceExpressions,
       OrderBy scanOrder,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index e3f332b877..0e6da8c705 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -118,6 +118,12 @@ public class LogicalPlanner {
     public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext 
context) {
       LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
 
+      if (queryStatement.isLastQuery()) {
+        return planBuilder
+            .planLast(analysis.getSourceExpressions(), 
analysis.getGlobalTimeFilter())
+            .getRoot();
+      }
+
       if (queryStatement.isAlignByDevice()) {
         Map<String, PlanNode> deviceToSubPlanMap = new HashMap<>();
         for (String deviceName : 
analysis.getDeviceToSourceExpressions().keySet()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
index 5923bc8942..144ea7dc76 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -33,7 +33,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-import static 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_COLUMN_HEADERS;
+import static 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
 public class LastQueryMergeNode extends ProcessNode {
 
@@ -76,7 +76,7 @@ public class LastQueryMergeNode extends ProcessNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return LAST_QUERY_COLUMN_HEADERS;
+    return LAST_QUERY_HEADER_COLUMNS;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
index dced79aa0a..66c8a54f34 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
@@ -32,7 +32,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
 
-import static 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_COLUMN_HEADERS;
+import static 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
 public class AlignedLastQueryScanNode extends SourceNode {
   // The path of the target series which will be scanned.
@@ -91,7 +91,7 @@ public class AlignedLastQueryScanNode extends SourceNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return LAST_QUERY_COLUMN_HEADERS;
+    return LAST_QUERY_HEADER_COLUMNS;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
index ab20f9548c..be97c5187d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -34,8 +34,8 @@ import java.util.Objects;
 
 public class LastQueryScanNode extends SourceNode {
 
-  public static final List<String> LAST_QUERY_COLUMN_HEADERS =
-      ImmutableList.of("Time", "timeseries", "value", "dataType");
+  public static final List<String> LAST_QUERY_HEADER_COLUMNS =
+      ImmutableList.of("timeseries", "value", "dataType");
 
   // The path of the target series which will be scanned.
   private final MeasurementPath seriesPath;
@@ -97,7 +97,7 @@ public class LastQueryScanNode extends SourceNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return LAST_QUERY_COLUMN_HEADERS;
+    return LAST_QUERY_HEADER_COLUMNS;
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index 2cb8ddbb63..e58bf4af47 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -37,11 +37,14 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -103,6 +106,46 @@ public class QueryLogicalPlanUtil {
     }
   }
 
+  /* Last Query */
+  static {
+    String sql = "SELECT last * FROM root.sg.** WHERE time > 100";
+
+    QueryId queryId = new QueryId("test");
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+    sourceNodeList.add(
+        new LastQueryScanNode(
+            queryId.genPlanNodeId(), (MeasurementPath) 
schemaMap.get("root.sg.d1.s3")));
+    sourceNodeList.add(
+        new LastQueryScanNode(
+            queryId.genPlanNodeId(), (MeasurementPath) 
schemaMap.get("root.sg.d1.s1")));
+    sourceNodeList.add(
+        new LastQueryScanNode(
+            queryId.genPlanNodeId(), (MeasurementPath) 
schemaMap.get("root.sg.d1.s2")));
+    sourceNodeList.add(
+        new LastQueryScanNode(
+            queryId.genPlanNodeId(), (MeasurementPath) 
schemaMap.get("root.sg.d2.s4")));
+    sourceNodeList.add(
+        new AlignedLastQueryScanNode(
+            queryId.genPlanNodeId(),
+            new AlignedPath((MeasurementPath) 
schemaMap.get("root.sg.d2.a.s1"))));
+    sourceNodeList.add(
+        new AlignedLastQueryScanNode(
+            queryId.genPlanNodeId(),
+            new AlignedPath((MeasurementPath) 
schemaMap.get("root.sg.d2.a.s2"))));
+    sourceNodeList.add(
+        new LastQueryScanNode(
+            queryId.genPlanNodeId(), (MeasurementPath) 
schemaMap.get("root.sg.d2.s1")));
+    sourceNodeList.add(
+        new LastQueryScanNode(
+            queryId.genPlanNodeId(), (MeasurementPath) 
schemaMap.get("root.sg.d2.s2")));
+
+    LastQueryMergeNode lastQueryMergeNode =
+        new LastQueryMergeNode(queryId.genPlanNodeId(), sourceNodeList, 
TimeFilter.gt(100));
+
+    querySQLs.add(sql);
+    sqlToPlanMap.put(sql, lastQueryMergeNode);
+  }
+
   /* Simple Query */
   static {
     String sql = "SELECT ** FROM root.sg.d2 LIMIT 10 OFFSET 10";

Reply via email to