This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 51e6d17b821 [IOTDB-6120] Push down limit/offset in query with group by
time
51e6d17b821 is described below
commit 51e6d17b82191e6661a91ee1696138480560e532
Author: YangCaiyin <[email protected]>
AuthorDate: Tue Aug 22 15:48:06 2023 +0800
[IOTDB-6120] Push down limit/offset in query with group by time
---
.../db/queryengine/plan/analyze/Analysis.java | 11 +
.../queryengine/plan/analyze/AnalyzeVisitor.java | 76 +++---
.../plan/optimization/LimitOffsetPushDown.java | 117 +++++++++
.../db/queryengine/plan/parser/ASTVisitor.java | 15 +-
.../plan/planner/LogicalPlanVisitor.java | 12 +-
.../plan/statement/component/OrderByComponent.java | 8 +-
.../plan/statement/crud/QueryStatement.java | 20 ++
.../plan/optimization/LimitOffsetPushDownTest.java | 271 +++++++++++++++++++++
8 files changed, 483 insertions(+), 47 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 1b900c0e425..0f9c4d223c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -127,6 +127,9 @@ public class Analysis {
// Query Analysis (used in ALIGN BY DEVICE)
/////////////////////////////////////////////////////////////////////////////////////////////////
+ // the list of device names
+ private List<PartialPath> deviceList;
+
// map from output device name to queried devices
private Map<String, List<String>> outputDeviceToQueriedDevicesMap;
@@ -744,4 +747,12 @@ public class Analysis {
public void setLastLevelUseWildcard(boolean lastLevelUseWildcard) {
this.lastLevelUseWildcard = lastLevelUseWildcard;
}
+
+ public void setDeviceList(List<PartialPath> deviceList) {
+ this.deviceList = deviceList;
+ }
+
+ public List<PartialPath> getDeviceList() {
+ return deviceList;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 7887a5d71a9..92d2e85c1b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -156,6 +156,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -182,6 +183,8 @@ import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHE
import static
org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils.constructTargetDevice;
import static
org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils.constructTargetMeasurement;
import static
org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils.constructTargetPath;
+import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice;
+import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;
import static
org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.GetSourcePathsVisitor.getSourcePaths;
/** This visitor is used to analyze each type of Statement and returns the
{@link Analysis}. */
@@ -248,17 +251,23 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
List<Pair<Expression, String>> outputExpressions;
if (queryStatement.isAlignByDevice()) {
- Set<PartialPath> deviceSet = analyzeFrom(queryStatement, schemaTree);
+ List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
- analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceSet);
- outputExpressions = analyzeSelect(analysis, queryStatement,
schemaTree, deviceSet);
- if (deviceSet.isEmpty()) {
+ if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
+ // remove the device which won't appear in resultSet after
limit/offset
+ deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList,
queryStatement);
+ }
+
+ analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
+ outputExpressions = analyzeSelect(analysis, queryStatement,
schemaTree, deviceList);
+ if (deviceList.isEmpty()) {
return finishQuery(queryStatement, analysis);
}
+ analysis.setDeviceList(deviceList);
- analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree,
deviceSet);
- analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree,
deviceSet);
- analyzeHaving(analysis, queryStatement, schemaTree, deviceSet);
+ analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree,
deviceList);
+ analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree,
deviceList);
+ analyzeHaving(analysis, queryStatement, schemaTree, deviceList);
analyzeDeviceToAggregation(analysis, queryStatement);
analyzeDeviceToSourceTransform(analysis, queryStatement);
@@ -267,7 +276,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analyzeDeviceViewOutput(analysis, queryStatement);
analyzeDeviceViewInput(analysis, queryStatement);
- analyzeInto(analysis, queryStatement, deviceSet, outputExpressions);
+ analyzeInto(analysis, queryStatement, deviceList, outputExpressions);
} else {
Map<Integer, List<Pair<Expression, String>>> outputExpressionMap =
analyzeSelect(analysis, queryStatement, schemaTree);
@@ -389,15 +398,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
whereCondition.setPredicate(predicate);
}
}
- if (queryStatement.isGroupByTime()) {
- GroupByTimeComponent groupByTimeComponent =
queryStatement.getGroupByTimeComponent();
- Filter groupByFilter = initGroupByFilter(groupByTimeComponent);
- if (globalTimeFilter == null) {
- globalTimeFilter = groupByFilter;
- } else {
- globalTimeFilter = FilterFactory.and(globalTimeFilter, groupByFilter);
- }
- }
analysis.setGlobalTimeFilter(globalTimeFilter);
analysis.setHasValueFilter(hasValueFilter);
}
@@ -536,11 +536,11 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return outputExpressionMap;
}
- private Set<PartialPath> analyzeFrom(QueryStatement queryStatement,
ISchemaTree schemaTree) {
+ private List<PartialPath> analyzeFrom(QueryStatement queryStatement,
ISchemaTree schemaTree) {
// device path patterns in FROM clause
List<PartialPath> devicePatternList =
queryStatement.getFromComponent().getPrefixPaths();
- Set<PartialPath> deviceSet = new LinkedHashSet<>();
+ Set<PartialPath> deviceSet = new HashSet<>();
for (PartialPath devicePattern : devicePatternList) {
// get all matched devices
deviceSet.addAll(
@@ -548,21 +548,23 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
.map(DeviceSchemaInfo::getDevicePath)
.collect(Collectors.toList()));
}
- return deviceSet;
+
+ return queryStatement.getResultDeviceOrder() == Ordering.ASC
+ ? deviceSet.stream().sorted().collect(Collectors.toList())
+ :
deviceSet.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
}
private List<Pair<Expression, String>> analyzeSelect(
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- Set<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet) {
List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
Map<String, Set<Expression>> deviceToSelectExpressions = new HashMap<>();
ColumnPaginationController paginationController =
new ColumnPaginationController(
queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset(),
false);
- Set<PartialPath> noMeasurementDevices = new HashSet<>(deviceSet);
for (ResultColumn resultColumn :
queryStatement.getSelectComponent().getResultColumns()) {
Expression selectExpression = resultColumn.getExpression();
@@ -578,7 +580,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
if (selectExpressionsOfOneDevice.isEmpty()) {
continue;
}
- noMeasurementDevices.remove(device);
updateMeasurementToDeviceSelectExpressions(
analysis, measurementToDeviceSelectExpressions, device,
selectExpressionsOfOneDevice);
}
@@ -626,6 +627,12 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
// remove devices without measurements to compute
+ Set<PartialPath> noMeasurementDevices = new HashSet<>();
+ for (PartialPath device : deviceSet) {
+ if (!deviceToSelectExpressions.containsKey(device.getFullPath())) {
+ noMeasurementDevices.add(device);
+ }
+ }
deviceSet.removeAll(noMeasurementDevices);
// when the select expression of any device is empty,
@@ -727,7 +734,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- Set<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet) {
if (!queryStatement.hasHaving()) {
return;
}
@@ -1191,7 +1198,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- Set<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet) {
if (!queryStatement.hasWhere()) {
return;
}
@@ -1448,7 +1455,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- Set<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet) {
if (queryStatement.getGroupByComponent() == null) {
return;
}
@@ -1521,7 +1528,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- Set<PartialPath> deviceSet) {
+ List<PartialPath> deviceSet) {
if (!queryStatement.hasOrderByExpression()) {
return;
}
@@ -1681,6 +1688,10 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return;
}
+ if (queryStatement.isResultSetEmpty()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ }
+
GroupByTimeComponent groupByTimeComponent =
queryStatement.getGroupByTimeComponent();
if ((groupByTimeComponent.isIntervalByMonth() ||
groupByTimeComponent.isSlidingStepByMonth())
&& queryStatement.getResultTimeOrder() == Ordering.DESC) {
@@ -1692,6 +1703,15 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
"The query time range should be specified in the GROUP BY TIME
clause.");
}
analysis.setGroupByTimeParameter(new
GroupByTimeParameter(groupByTimeComponent));
+
+ Filter globalTimeFilter = analysis.getGlobalTimeFilter();
+ Filter groupByFilter = initGroupByFilter(groupByTimeComponent);
+ if (globalTimeFilter == null) {
+ globalTimeFilter = groupByFilter;
+ } else {
+ globalTimeFilter = FilterFactory.and(globalTimeFilter, groupByFilter);
+ }
+ analysis.setGlobalTimeFilter(globalTimeFilter);
}
private void analyzeFill(Analysis analysis, QueryStatement queryStatement) {
@@ -1845,7 +1865,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
private void analyzeInto(
Analysis analysis,
QueryStatement queryStatement,
- Set<PartialPath> deviceSet,
+ List<PartialPath> deviceSet,
List<Pair<Expression, String>> outputExpressions) {
if (!queryStatement.isSelectInto()) {
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java
index 52c9b8514fe..37dd6ff17a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.optimization;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer;
@@ -37,8 +38,13 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeri
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
+import
org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* <b>Optimization phase:</b> Distributed plan planning
*
@@ -239,4 +245,115 @@ public class LimitOffsetPushDown implements PlanOptimizer
{
return analysis;
}
}
+
+ // following methods are used to push down limit/offset in group by time
+
+ // 1. push down limit/offset to group by time in align by time
+
+ public static boolean canPushDownLimitOffsetToGroupByTime(QueryStatement
queryStatement) {
+ if (queryStatement.isGroupByTime()
+ && !queryStatement.isAlignByDevice()
+ && !queryStatement.hasHaving()
+ && !queryStatement.hasFill()) {
+ return !queryStatement.hasOrderBy() ||
queryStatement.isOrderByBasedOnTime();
+ }
+ return false;
+ }
+
+ public static void pushDownLimitOffsetToTimeParameter(QueryStatement
queryStatement) {
+ GroupByTimeComponent groupByTimeComponent =
queryStatement.getGroupByTimeComponent();
+ long startTime = groupByTimeComponent.getStartTime();
+ long endTime = groupByTimeComponent.getEndTime();
+ long step = groupByTimeComponent.getSlidingStep();
+ long interval = groupByTimeComponent.getInterval();
+
+ long size = (endTime - startTime + step - 1) / step;
+ if (size > queryStatement.getRowOffset()) {
+ long limitSize = queryStatement.getRowLimit();
+ long offsetSize = queryStatement.getRowOffset();
+ if (queryStatement.getResultTimeOrder() == Ordering.ASC) {
+ startTime = startTime + offsetSize * step;
+ } else {
+ startTime = startTime + (size - offsetSize - limitSize) * step;
+ }
+ endTime =
+ limitSize == 0
+ ? endTime
+ : Math.min(endTime, startTime + (limitSize - 1) * step +
interval);
+ groupByTimeComponent.setEndTime(endTime);
+ groupByTimeComponent.setStartTime(startTime);
+ } else {
+ // finish the query, resultSet is empty
+ queryStatement.setResultSetEmpty(true);
+ }
+ queryStatement.setRowLimit(0);
+ queryStatement.setRowOffset(0);
+ }
+
+ // 2. push down limit/offset to group by time in align by device
+ public static boolean canPushDownLimitOffsetInGroupByTimeForDevice(
+ QueryStatement queryStatement) {
+ if (!hasLimitOffset(queryStatement)) {
+ return false;
+ }
+
+ if (queryStatement.isGroupByTime()
+ && queryStatement.isAlignByDevice()
+ && !queryStatement.hasHaving()
+ && !queryStatement.hasFill()) {
+ return !queryStatement.hasOrderBy() ||
queryStatement.isOrderByBasedOnDevice();
+ }
+ return false;
+ }
+
+ public static List<PartialPath> pushDownLimitOffsetInGroupByTimeForDevice(
+ List<PartialPath> deviceNames, QueryStatement queryStatement) {
+ GroupByTimeComponent groupByTimeComponent =
queryStatement.getGroupByTimeComponent();
+ long startTime = groupByTimeComponent.getStartTime();
+ long endTime = groupByTimeComponent.getEndTime();
+
+ long size =
+ (endTime - startTime + groupByTimeComponent.getSlidingStep() - 1)
+ / groupByTimeComponent.getSlidingStep();
+ if (size == 0 || size * deviceNames.size() <=
queryStatement.getRowOffset()) {
+ // resultSet is empty
+ queryStatement.setResultSetEmpty(true);
+ return deviceNames;
+ }
+
+ long limitSize = queryStatement.getRowLimit();
+ long offsetSize = queryStatement.getRowOffset();
+ List<PartialPath> optimizedDeviceNames = new ArrayList<>();
+ int startDeviceIndex = (int) (offsetSize / size);
+ int endDeviceIndex =
+ limitSize == 0
+ ? deviceNames.size() - 1
+ : (int)
+ ((limitSize - ((startDeviceIndex + 1) * size - offsetSize) +
size - 1) / size
+ + startDeviceIndex);
+
+ int index = 0;
+ while (index < startDeviceIndex) {
+ index++;
+ }
+ queryStatement.setRowOffset(offsetSize - startDeviceIndex * size);
+
+ // if only refer to one device, optimize the time parameter
+ if (startDeviceIndex == endDeviceIndex) {
+ optimizedDeviceNames.add(deviceNames.get(startDeviceIndex));
+ if (hasLimitOffset(queryStatement) &&
queryStatement.isOrderByTimeInDevices()) {
+ pushDownLimitOffsetToTimeParameter(queryStatement);
+ }
+ } else {
+ while (index <= endDeviceIndex && index < deviceNames.size()) {
+ optimizedDeviceNames.add(deviceNames.get(index));
+ index++;
+ }
+ }
+ return optimizedDeviceNames;
+ }
+
+ private static boolean hasLimitOffset(QueryStatement queryStatement) {
+ return queryStatement.hasLimit() || queryStatement.hasOffset();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 7d74eeed2de..72ab30672b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -226,6 +226,8 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_RESULT_NODES;
+import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetToGroupByTime;
+import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetToTimeParameter;
import static org.apache.iotdb.db.utils.constant.SqlConstant.CAST_FUNCTION;
import static org.apache.iotdb.db.utils.constant.SqlConstant.CAST_TYPE;
import static org.apache.iotdb.db.utils.constant.SqlConstant.REPLACE_FROM;
@@ -1360,6 +1362,11 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
queryStatement.setFillComponent(parseFillClause(ctx.fillClause()));
}
+ // parse ALIGN BY
+ if (ctx.alignByClause() != null) {
+ queryStatement.setResultSetFormat(parseAlignBy(ctx.alignByClause()));
+ }
+
if (ctx.paginationClause() != null) {
// parse SLIMIT & SOFFSET
if (ctx.paginationClause().seriesPaginationClause() != null) {
@@ -1383,14 +1390,12 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
queryStatement.setRowOffset(
parseOffsetClause(ctx.paginationClause().rowPaginationClause().offsetClause()));
}
+ if (canPushDownLimitOffsetToGroupByTime(queryStatement)) {
+ pushDownLimitOffsetToTimeParameter(queryStatement);
+ }
}
}
- // parse ALIGN BY
- if (ctx.alignByClause() != null) {
- queryStatement.setResultSetFormat(parseAlignBy(ctx.alignByClause()));
- }
-
queryStatement.setUseWildcard(useWildcard);
queryStatement.setLastLevelUseWildcard(lastLevelUseWildcard);
return queryStatement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 372afb78488..9f28ec21026 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -51,7 +51,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.PipeEnriched
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
-import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
@@ -89,12 +88,11 @@ import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
/**
* This visitor is used to generate a logical plan for the statement and
returns the {@link
@@ -135,11 +133,9 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
}
if (queryStatement.isAlignByDevice()) {
- Map<String, PlanNode> deviceToSubPlanMap =
- queryStatement.getResultDeviceOrder() == Ordering.ASC
- ? new TreeMap<>()
- : new TreeMap<>(Collections.reverseOrder());
- for (String deviceName :
analysis.getDeviceToSourceExpressions().keySet()) {
+ Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>();
+ for (PartialPath device : analysis.getDeviceList()) {
+ String deviceName = device.getFullPath();
LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(analysis,
context);
subPlanBuilder =
subPlanBuilder.withNewRoot(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/component/OrderByComponent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/component/OrderByComponent.java
index 7419d20e5ab..8774db378f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/component/OrderByComponent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/component/OrderByComponent.java
@@ -124,12 +124,8 @@ public class OrderByComponent extends StatementNode {
return sortItemList.get(deviceOrderPriority).getOrdering();
}
- public boolean isDeviceOrderInitialized() {
- return deviceOrderPriority != -1;
- }
-
- public boolean isTimeOrderInitialized() {
- return timeOrderPriority != -1;
+ public int getTimeOrderPriority() {
+ return timeOrderPriority;
}
public String toSQLString() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java
index 8179eda9fbd..ae363342256 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java
@@ -118,6 +118,11 @@ public class QueryStatement extends Statement {
// can use statistics to skip
private boolean lastLevelUseWildcard = false;
+ // used in limit/offset push down optimizer, if the result set is empty
after pushing down in
+ // ASTVisitor,
+ // we can skip the query
+ private boolean isResultSetEmpty = false;
+
public QueryStatement() {
this.statementType = StatementType.QUERY;
}
@@ -195,6 +200,14 @@ public class QueryStatement extends Statement {
this.rowOffset = rowOffset;
}
+ public boolean isResultSetEmpty() {
+ return isResultSetEmpty;
+ }
+
+ public void setResultSetEmpty(boolean resultSetEmpty) {
+ isResultSetEmpty = resultSetEmpty;
+ }
+
public long getSeriesLimit() {
return seriesLimit;
}
@@ -341,6 +354,13 @@ public class QueryStatement extends Statement {
return orderByComponent != null && orderByComponent.isOrderByTime();
}
+ public boolean isOrderByTimeInDevices() {
+ return orderByComponent == null
+ || (orderByComponent.isBasedOnDevice()
+ && (orderByComponent.getSortItemList().size() == 1
+ || orderByComponent.getTimeOrderPriority() == 1));
+ }
+
public boolean isOrderByTimeseries() {
return orderByComponent != null && orderByComponent.isOrderByTimeseries();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
index afbef764c10..ff2b70a124b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
@@ -33,8 +33,11 @@ import
org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
+import
org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.Assert;
@@ -296,4 +299,272 @@ public class LimitOffsetPushDownTest {
Assert.assertEquals(
actualPlan, new LimitOffsetPushDown().optimize(actualPlan, analysis,
context));
}
+
+ // test for limit/offset push down in group by time
+ @Test
+ public void testGroupByTimePushDown() {
+ String sql = "select avg(s1),sum(s2) from root.** group by ((1, 899],
200ms) offset 1 limit 2";
+ checkGroupByTimePushDown(sql, 201, 601, 0, 0);
+ }
+
+ @Test
+ public void testGroupByTimePushDown2() {
+ String sql = "select avg(s1),sum(s2) from root.** group by ([4, 899),
200ms) offset 2 limit 3";
+ checkGroupByTimePushDown(sql, 404, 899, 0, 0);
+ }
+
+ @Test
+ public void testGroupByTimePushDown3() {
+ String sql = "select avg(s1),sum(s2) from root.** group by ([4, 899),
88ms) offset 2 limit 3";
+ checkGroupByTimePushDown(sql, 180, 444, 0, 0);
+ }
+
+ @Test
+ public void testGroupByTimePushDown4() {
+ String sql =
+ "select avg(s1),sum(s2) from root.** group by ([4, 899), 88ms) order
by time offset 2 limit 3";
+ checkGroupByTimePushDown(sql, 180, 444, 0, 0);
+ }
+
+ @Test
+ public void testGroupByTimePushDown5() {
+ String sql =
+ "select avg(s1),sum(s2) from root.** group by ([4, 899), 88ms) order
by time desc offset 2 limit 3";
+ checkGroupByTimePushDown(sql, 532, 796, 0, 0);
+ }
+
+ @Test
+ public void testGroupByTimePushDown6() {
+ String sql =
+ "select avg(s1),sum(s2) from root.** group by ([4, 899), 100ms) order
by time desc offset 2 limit 3";
+ checkGroupByTimePushDown(sql, 404, 704, 0, 0);
+ }
+
+ @Test
+ public void testGroupByTimePushDown7() {
+ String sql =
+ "select avg(s1),sum(s2) from root.** group by ([4, 899), 50ms) order
by time desc offset 2 limit 3";
+ checkGroupByTimePushDown(sql, 654, 804, 0, 0);
+ }
+
+ @Test
+ public void testGroupByTimePushDown8() {
+ String sql =
+ "select avg(s1),sum(s2) from root.** group by ([0, 900), 100ms) order
by time desc offset 2 limit 2";
+ checkGroupByTimePushDown(sql, 500, 700, 0, 0);
+ }
+
+ @Test
+ public void testGroupByTimePushDown9() {
+ String sql =
+ "select avg(s1),sum(s2) from root.** group by ([4, 899), 50ms) order
by s1 offset 2 limit 3";
+ checkGroupByTimePushDown(sql, 4, 899, 3, 2);
+ }
+
+ @Test
+ public void testGroupByTimePushDown10() {
+ String sql =
+ "select avg(s1),sum(s2) from root.** group by ([4, 899), 50ms, 25ms)
offset 2 limit 3";
+ checkGroupByTimePushDown(sql, 54, 154, 0, 0);
+ }
+
+ @Test
+ public void testGroupByTimePushDown11() {
+ String sql =
+ "select avg(s1),sum(s2) from root.** group by ([4, 899), 50ms, 75ms)
offset 2 limit 3";
+ checkGroupByTimePushDown(sql, 154, 354, 0, 0);
+ }
+
+ private void checkGroupByTimePushDown(
+ String sql, long startTime, long endTime, long rowLimit, long rowOffset)
{
+ QueryStatement queryStatement =
+ (QueryStatement) StatementGenerator.createStatement(sql,
ZonedDateTime.now().getOffset());
+ Assert.assertEquals(rowLimit, queryStatement.getRowLimit());
+ Assert.assertEquals(rowOffset, queryStatement.getRowOffset());
+
+ GroupByTimeComponent groupByTimeComponent =
queryStatement.getGroupByTimeComponent();
+ Assert.assertEquals(startTime, groupByTimeComponent.getStartTime());
+ Assert.assertEquals(endTime, groupByTimeComponent.getEndTime());
+ }
+
+ // device: [root.sg.s1, root.sg.s2, root.sg.s2.a]
+ private void checkGroupByTimePushDownInAlignByDevice(
+ String sql,
+ List<String> deviceSet,
+ long rowLimit,
+ long rowOffset,
+ long startTime,
+ long endTime) {
+ QueryStatement statement =
+ (QueryStatement) StatementGenerator.createStatement(sql,
ZonedDateTime.now().getOffset());
+ MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+ Analyzer analyzer =
+ new Analyzer(context, new FakePartitionFetcherImpl(), new
FakeSchemaFetcherImpl());
+ Analysis analysis = analyzer.analyze(statement);
+
+ Assert.assertEquals(rowLimit, statement.getRowLimit());
+ Assert.assertEquals(rowOffset, statement.getRowOffset());
+
+ int index = 0;
+ List<PartialPath> deviceSetInAnalysis = analysis.getDeviceList();
+ for (PartialPath path : deviceSetInAnalysis) {
+ Assert.assertEquals(path.getFullPath(), deviceSet.get(index));
+ index++;
+ }
+
+ GroupByTimeParameter groupByTimeParameter =
analysis.getGroupByTimeParameter();
+ Assert.assertEquals(startTime, groupByTimeParameter.getStartTime());
+ Assert.assertEquals(endTime, groupByTimeParameter.getEndTime());
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 899), 50ms) offset 16 limit
2 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d1");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 804, 899);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice2() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 899), 50ms) offset 16 limit
10 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d1");
+ deviceSet.add("root.sg.d2");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 10, 16, 4, 899);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice3() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 899), 50ms) offset 20 limit
2 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 104, 204);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice4() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 899), 50ms) offset 33 limit
5 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ deviceSet.add("root.sg.d2.a");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 5, 15, 4, 899);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice5() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) offset 9
limit 5 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 29, 179);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice6() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) offset 9
limit 9 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ deviceSet.add("root.sg.d2.a");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 9, 1, 4, 199);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice7() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) offset 9
limit 9 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ deviceSet.add("root.sg.d2.a");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 9, 1, 4, 199);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice8() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by
device offset 9 limit 9 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ deviceSet.add("root.sg.d2.a");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 9, 1, 4, 199);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice9() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by
device desc offset 9 limit 9 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ deviceSet.add("root.sg.d1");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 9, 1, 4, 199);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice10() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by
device, time desc offset 9 limit 5 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 54, 199);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice11() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by
device desc, time desc offset 9 limit 5 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 54, 199);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice12() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 899), 50ms) order by device
desc offset 16 limit 2 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2.a");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 804, 899);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice13() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by
device, avg(s1) desc offset 9 limit 5 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 5, 1, 4, 199);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice14() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by
device, avg(s1) desc,time desc offset 9 limit 5 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 5, 1, 4, 199);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice15() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by
device desc limit 1 offset 8 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 4, 54);
+ }
+
+ @Test
+ public void testGroupByTimePushDownInAlignByDevice16() {
+ String sql =
+ "select avg(s1) from root.** group by ([4, 199), 50ms, 25ms) order by
device desc offset 8 align by device";
+ List<String> deviceSet = new ArrayList<>();
+ deviceSet.add("root.sg.d2");
+ deviceSet.add("root.sg.d1");
+ checkGroupByTimePushDownInAlignByDevice(sql, deviceSet, 0, 0, 4, 199);
+ }
}