This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/agg_query_level in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b429e4c776b892646d95bbd5ebe98a8ab34227f0 Author: Jinrui.Zhang <[email protected]> AuthorDate: Thu Jul 21 18:48:54 2022 +0800 fix the issue that GroupByLevel cannot be used with Value Filter --- .../iotdb/db/mpp/plan/execution/QueryExecution.java | 10 +++++----- .../planner/distribution/DistributionPlanner.java | 1 + .../plan/planner/distribution/SourceRewriter.java | 20 ++++++++++++++++++++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 4534b7b740..eb6974865f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -171,11 +171,11 @@ public class QueryExecution implements IQueryExecution { stateMachine.transitionToRunning(); return; } - long remainTime = context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime()); - if (remainTime <= 0) { - throw new QueryTimeoutRuntimeException(); - } - context.setTimeOut(remainTime); +// long remainTime = context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime()); +// if (remainTime <= 0) { +// throw new QueryTimeoutRuntimeException(); +// } +// context.setTimeOut(remainTime); doLogicalPlan(); doDistributedPlan(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java index 2aad1c8a85..5365fe2bf8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil; import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index 51618cbe64..c0918156f8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -53,6 +53,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter; +import org.apache.ratis.thirdparty.io.opencensus.stats.Aggregation; import java.util.ArrayList; import java.util.Collections; @@ -531,6 +532,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte @Override public PlanNode visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) { + if (shouldUseNaiveAggregation(root)) { + return defaultRewrite(root, context); + } // Firstly, we build the tree structure for GroupByLevelNode List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context); Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup = @@ -554,6 +558,22 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return newRoot; } + // If the Aggregation Query contains value filter, we need to use the naive query plan + // for it. That is, do the raw data query and then do the aggregation operation. + // Currently, the method to judge whether the query should use naive query plan is whether + // AggregationNode is contained in the PlanNode tree of logical plan. + private boolean shouldUseNaiveAggregation(PlanNode root) { + if (root instanceof AggregationNode) { + return true; + } + for (PlanNode child : root.getChildren()) { + if (shouldUseNaiveAggregation(child)) { + return true; + } + } + return false; + } + private GroupByLevelNode groupSourcesForGroupByLevelWithSlidingWindow( GroupByLevelNode root, SlidingWindowAggregationNode slidingWindowNode,
