This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 4e3e0fdb240 add temp index scan, prune column, distribute query planner
4e3e0fdb240 is described below
commit 4e3e0fdb24017f2c566095243b3f283b3b05d7bd
Author: Beyyes <[email protected]>
AuthorDate: Wed Apr 17 18:16:19 2024 +0800
add temp index scan, prune column, distribute query planner
---
.../planner/distribution/DistributionPlanner.java | 3 +
.../SimpleFragmentParallelPlanner.java | 4 +-
.../plan/relational/planner/LogicalPlanner.java | 11 +-
.../plan/relational/planner/RelationPlanner.java | 22 +--
.../relational/planner/RelationalModelPlanner.java | 4 +-
.../ExchangeNodeGenerator.java} | 20 ++-
.../distribute/FragmentInstanceGenerator.java} | 179 +++++++--------------
.../distribute/RelationalDistributionPlanner.java | 31 +++-
.../planner/distribute/SimplePlanRewriter.java | 44 +++++
.../planner/distribute/SubPlanGenerator.java | 77 +++++++++
.../relational/planner/node/TableScanNode.java | 16 +-
.../planner/optimizations/IndexScan.java | 93 ++++++++++-
...Expressions.java => PruneTableScanColumns.java} | 53 ++++--
.../optimizations/RelationalPlanOptimizer.java | 9 +-
.../RemoveRedundantIdentityProjections.java | 12 +-
.../planner/optimizations/SimplifyExpressions.java | 9 +-
.../plan/relational/analyzer/AnalyzerTest.java | 10 +-
17 files changed, 434 insertions(+), 163 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 755119b2607..42920df7561 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -206,14 +206,17 @@ public class DistributionPlanner {
.getRespDatasetHeader()
.setColumnToTsBlockIndexMap(optimizedRootWithExchange.getOutputColumnNames());
}
+
SubPlan subPlan = splitFragment(optimizedRootWithExchange);
// Mark the root Fragment of root SubPlan as `root`
subPlan.getPlanFragment().setRoot(true);
+
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
// Only execute this step for READ operation
if (context.getQueryType() == QueryType.READ) {
setSinkForRootInstance(subPlan, fragmentInstances);
}
+
return new DistributedQueryPlan(
logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(),
fragmentInstances);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 1394ba96503..a7b475a4e3c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -214,12 +214,12 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
if (availableDataNodes.isEmpty()) {
String errorMsg =
String.format(
- "all replicas for region[%s] are not available in these
DataNodes[%s]",
+ "All replicas for region[%s] are not available in these
DataNodes[%s]",
regionReplicaSet.getRegionId(),
regionReplicaSet.getDataNodeLocations());
throw new IllegalArgumentException(errorMsg);
}
if (regionReplicaSet.getDataNodeLocationsSize() !=
availableDataNodes.size()) {
- logger.info("available replicas: " + availableDataNodes);
+ logger.info("available replicas: {}", availableDataNodes);
}
int targetIndex;
if (!selectRandomDataNode || queryContext.getSession() == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 5e54848763f..843517be036 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.IndexScan;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PruneTableScanColumns;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RelationalPlanOptimizer;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RemoveRedundantIdentityProjections;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SimplifyExpressions;
@@ -60,13 +62,18 @@ public class LogicalPlanner {
this.warningCollector = requireNonNull(warningCollector, "warningCollector
is null");
this.relationalPlanOptimizers =
- Arrays.asList(new SimplifyExpressions(), new
RemoveRedundantIdentityProjections());
+ Arrays.asList(
+ new SimplifyExpressions(),
+ new RemoveRedundantIdentityProjections(),
+ new PruneTableScanColumns(),
+ new IndexScan());
}
public LogicalQueryPlan plan(Analysis analysis) throws IoTDBException {
PlanNode planNode = planStatement(analysis, analysis.getStatement());
- relationalPlanOptimizers.forEach(optimizer -> optimizer.optimize(planNode,
analysis, context));
+ relationalPlanOptimizers.forEach(
+ optimizer -> optimizer.optimize(planNode, analysis, metadata,
sessionInfo, context));
return new LogicalQueryPlan(context, planNode);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 584b9c1f80b..d8526986a92 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -49,39 +49,40 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
private final Analysis analysis;
private final SymbolAllocator symbolAllocator;
private final QueryId idAllocator;
- private final SessionInfo session;
+ private final SessionInfo sessionInfo;
private final Map<NodeRef<Node>, RelationPlan> recursiveSubqueries;
public RelationPlanner(
Analysis analysis,
SymbolAllocator symbolAllocator,
QueryId idAllocator,
- SessionInfo session,
+ SessionInfo sessionInfo,
Map<NodeRef<Node>, RelationPlan> recursiveSubqueries) {
requireNonNull(analysis, "analysis is null");
requireNonNull(symbolAllocator, "symbolAllocator is null");
requireNonNull(idAllocator, "idAllocator is null");
- requireNonNull(session, "session is null");
+ requireNonNull(sessionInfo, "session is null");
requireNonNull(recursiveSubqueries, "recursiveSubqueries is null");
this.analysis = analysis;
this.symbolAllocator = symbolAllocator;
this.idAllocator = idAllocator;
- this.session = session;
+ this.sessionInfo = sessionInfo;
this.recursiveSubqueries = recursiveSubqueries;
}
@Override
protected RelationPlan visitQuery(Query node, Void context) {
- return new QueryPlanner(analysis, symbolAllocator, idAllocator, session,
recursiveSubqueries)
+ return new QueryPlanner(
+ analysis, symbolAllocator, idAllocator, sessionInfo,
recursiveSubqueries)
.plan(node);
}
@Override
- protected RelationPlan visitTable(Table node, Void context) {
+ protected RelationPlan visitTable(Table table, Void context) {
// is this a recursive reference in expandable named query? If so, there's
base relation already
// planned.
- RelationPlan expansion = recursiveSubqueries.get(NodeRef.of(node));
+ RelationPlan expansion = recursiveSubqueries.get(NodeRef.of(table));
if (expansion != null) {
// put the pre-planned recursive subquery in the actual outer context to
enable resolving
// correlation
@@ -89,7 +90,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan,
Void> {
expansion.getRoot(), expansion.getScope(),
expansion.getFieldMappings());
}
- Scope scope = analysis.getScope(node);
+ Scope scope = analysis.getScope(table);
ImmutableList.Builder<Symbol> outputSymbolsBuilder =
ImmutableList.builder();
ImmutableMap.Builder<Symbol, ColumnSchema> symbolToColumnSchema =
ImmutableMap.builder();
Collection<Field> fields = scope.getRelationType().getAllFields();
@@ -106,7 +107,7 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
PlanNode root =
new TableScanNode(
idAllocator.genPlanNodeId(),
- node.getName().toString(),
+ table.getName().toString(),
outputSymbols,
symbolToColumnSchema.build());
@@ -121,7 +122,8 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
@Override
protected RelationPlan visitQuerySpecification(QuerySpecification node, Void
context) {
- return new QueryPlanner(analysis, symbolAllocator, idAllocator, session,
recursiveSubqueries)
+ return new QueryPlanner(
+ analysis, symbolAllocator, idAllocator, sessionInfo,
recursiveSubqueries)
.plan(node);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
index 713be1450bd..b9a6604e708 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
@@ -117,7 +117,9 @@ public class RelationalModelPlanner implements IPlanner {
@Override
public DistributedQueryPlan doDistributionPlan(IAnalysis analysis,
LogicalQueryPlan logicalPlan) {
- return new RelationalDistributionPlanner((Analysis) analysis,
logicalPlan).planFragments();
+ return new RelationalDistributionPlanner(
+ (Analysis) analysis, logicalPlan, logicalPlan.getContext())
+ .plan();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
similarity index 56%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
index 21a54da4e73..d11f3a1ee82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
@@ -11,17 +11,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
-package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
-
-import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+
+import java.util.Collections;
+import java.util.List;
-public class IndexScan implements RelationalPlanOptimizer {
+public class ExchangeNodeGenerator
+ extends SimplePlanRewriter<ExchangeNodeGenerator.DistributionPlanContext> {
@Override
- public PlanNode optimize(PlanNode planNode, Analysis analysis,
MPPQueryContext context) {
- return null;
+ public List<PlanNode> visitTableScan(
+ TableScanNode node, ExchangeNodeGenerator.DistributionPlanContext
context) {
+ // TODO process that the data of TableScanNode locates in multi data
regions
+ return Collections.singletonList(node);
}
+
+ public static class DistributionPlanContext {}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
similarity index 54%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
index 1394ba96503..e15e8ec00ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
@@ -1,22 +1,17 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -27,9 +22,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
-import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
@@ -38,12 +33,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
-import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.relational.sql.tree.Query;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -54,106 +45,75 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-/**
- * A simple implementation of IFragmentParallelPlaner. This planner will
transform one PlanFragment
- * into only one FragmentInstance.
- */
-public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
- private static final Logger logger =
LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
+public class FragmentInstanceGenerator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FragmentInstanceGenerator.class);
private final SubPlan subPlan;
+
private final Analysis analysis;
+
+ private final List<FragmentInstance> fragmentInstanceList = new
ArrayList<>();
+
private final MPPQueryContext queryContext;
// Record all the FragmentInstances belonged to same PlanFragment
- private final Map<PlanFragmentId, FragmentInstance> instanceMap;
+ private final Map<PlanFragmentId, FragmentInstance> instanceMap = new
HashMap<>();
+
// Record which PlanFragment the PlanNode belongs
- private final Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
- private final List<FragmentInstance> fragmentInstanceList;
+ private final Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap =
new HashMap<>();
// Record FragmentInstances dispatched to same DataNode
- private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap;
+ private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap =
new HashMap<>();
- public SimpleFragmentParallelPlanner(
- SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
+ FragmentInstanceGenerator(SubPlan subPlan, Analysis analysis,
MPPQueryContext queryContext) {
this.subPlan = subPlan;
this.analysis = analysis;
- this.queryContext = context;
- this.instanceMap = new HashMap<>();
- this.planNodeMap = new HashMap<>();
- this.fragmentInstanceList = new ArrayList<>();
- this.dataNodeFIMap = new HashMap<>();
+ this.queryContext = queryContext;
}
- @Override
- public List<FragmentInstance> parallelPlan() {
+ public List<FragmentInstance> plan() {
prepare();
calculateNodeTopologyBetweenInstance();
return fragmentInstanceList;
}
private void prepare() {
- List<PlanFragment> fragments = subPlan.getPlanFragmentList();
- for (PlanFragment fragment : fragments) {
+ for (PlanFragment fragment : subPlan.getPlanFragmentList()) {
recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
produceFragmentInstance(fragment);
}
- fragmentInstanceList.forEach(
- fragmentInstance ->
- fragmentInstance.setDataNodeFINum(
- dataNodeFIMap.get(fragmentInstance.getHostDataNode()).size()));
- // compute dataNodeSeriesScanNum in LastQueryScanNode
- if (analysis.getStatement() instanceof QueryStatement
- && ((QueryStatement) analysis.getStatement()).isLastQuery()) {
- final Map<Path, AtomicInteger> pathSumMap = new HashMap<>();
- dataNodeFIMap
- .values()
- .forEach(
- fragmentInstances -> {
- fragmentInstances.forEach(
- fragmentInstance ->
- updateScanNum(
- fragmentInstance.getFragment().getPlanNodeTree(),
pathSumMap));
- pathSumMap.clear();
- });
- }
+ fragmentInstanceList.forEach(
+ fi ->
fi.setDataNodeFINum(dataNodeFIMap.get(fi.getHostDataNode()).size()));
}
- private void updateScanNum(PlanNode planNode, Map<Path, AtomicInteger>
pathSumMap) {
- if (planNode instanceof LastSeriesSourceNode) {
- LastSeriesSourceNode lastSeriesSourceNode = (LastSeriesSourceNode)
planNode;
- pathSumMap.merge(
- lastSeriesSourceNode.getSeriesPath(),
- lastSeriesSourceNode.getDataNodeSeriesScanNum(),
- (k, v) -> {
- v.incrementAndGet();
- return v;
- });
- }
- planNode.getChildren().forEach(node -> updateScanNum(node, pathSumMap));
+ private void recordPlanNodeRelation(PlanNode root, PlanFragmentId
planFragmentId) {
+ planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root));
+ root.getChildren().forEach(child -> recordPlanNodeRelation(child,
planFragmentId));
}
private void produceFragmentInstance(PlanFragment fragment) {
- Expression globalTimePredicate = analysis.getGlobalTimePredicate();
+ // TODO fix globalTimePredicate
+ // Expression globalTimePredicate = analysis.getGlobalTimePredicate();
+ Expression globalTimePredicate = null;
FragmentInstance fragmentInstance =
new FragmentInstance(
fragment,
fragment.getId().genFragmentInstanceId(),
globalTimePredicate == null ? null : new
TreeModelTimePredicate(globalTimePredicate),
- queryContext.getQueryType(),
+ QueryType.READ,
queryContext.getTimeOut(),
queryContext.getSession(),
- queryContext.isExplainAnalyze(),
+ false,
fragment.isRoot());
// Get the target region for origin PlanFragment, then its instance will
be distributed one
// of them.
TRegionReplicaSet regionReplicaSet = fragment.getTargetRegion();
- // Set ExecutorType and target host for the instance
+ // Set ExecutorType and target host for the instance,
// We need to store all the replica host in case of the scenario that the
instance need to be
// redirected
// to another host when scheduling
@@ -183,11 +143,7 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
return v;
});
- if (analysis.getStatement() instanceof QueryStatement
- || analysis.getStatement() instanceof ExplainAnalyzeStatement
- || analysis.getStatement() instanceof ShowQueriesStatement
- || (analysis.getStatement() instanceof ShowTimeSeriesStatement
- && ((ShowTimeSeriesStatement)
analysis.getStatement()).isOrderByHeat())) {
+ if (analysis.getStatement() instanceof Query) {
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
}
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
@@ -199,12 +155,10 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
|| regionReplicaSet.getDataNodeLocations() == null
|| regionReplicaSet.getDataNodeLocations().isEmpty()) {
throw new IllegalArgumentException(
- String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
+ String.format("RegionReplicaSet is invalid: %s", regionReplicaSet));
}
String readConsistencyLevel =
IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
- // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel
as static variable or
- // enums
boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
// When planning fragment onto specific DataNode, the DataNode whose
endPoint is in
@@ -214,12 +168,12 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
if (availableDataNodes.isEmpty()) {
String errorMsg =
String.format(
- "all replicas for region[%s] are not available in these
DataNodes[%s]",
+ "All replicas for region[%s] are not available in these
DataNodes[%s]",
regionReplicaSet.getRegionId(),
regionReplicaSet.getDataNodeLocations());
throw new IllegalArgumentException(errorMsg);
}
if (regionReplicaSet.getDataNodeLocationsSize() !=
availableDataNodes.size()) {
- logger.info("available replicas: " + availableDataNodes);
+ LOGGER.info("Available replicas: {}", availableDataNodes);
}
int targetIndex;
if (!selectRandomDataNode || queryContext.getSession() == null) {
@@ -255,27 +209,25 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
PlanNode rootNode = instance.getFragment().getPlanNodeTree();
if (rootNode instanceof MultiChildrenSinkNode) {
MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) rootNode;
- sinkNode
- .getDownStreamChannelLocationList()
- .forEach(
- downStreamChannelLocation -> {
- // Set target Endpoint for FragmentSinkNode
- PlanNodeId downStreamNodeId =
- new
PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId());
- FragmentInstance downStreamInstance =
findDownStreamInstance(downStreamNodeId);
- downStreamChannelLocation.setRemoteEndpoint(
-
downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint());
- downStreamChannelLocation.setRemoteFragmentInstanceId(
- downStreamInstance.getId().toThrift());
-
- // Set upstream info for corresponding ExchangeNode in
downstream FragmentInstance
- PlanNode downStreamExchangeNode =
planNodeMap.get(downStreamNodeId).right;
- ((ExchangeNode) downStreamExchangeNode)
- .setUpstream(
-
instance.getHostDataNode().getMPPDataExchangeEndPoint(),
- instance.getId(),
- sinkNode.getPlanNodeId());
- });
+ for (DownStreamChannelLocation downStreamChannelLocation :
+ sinkNode.getDownStreamChannelLocationList()) {
+ // Set target Endpoint for FragmentSinkNode
+ PlanNodeId downStreamNodeId =
+ new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId());
+ FragmentInstance downStreamInstance =
findDownStreamInstance(downStreamNodeId);
+ downStreamChannelLocation.setRemoteEndpoint(
+
downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint());
+ downStreamChannelLocation.setRemoteFragmentInstanceId(
+ downStreamInstance.getId().toThrift());
+
+ // Set upstream info for corresponding ExchangeNode in downstream
FragmentInstance
+ PlanNode downStreamExchangeNode =
planNodeMap.get(downStreamNodeId).right;
+ ((ExchangeNode) downStreamExchangeNode)
+ .setUpstream(
+ instance.getHostDataNode().getMPPDataExchangeEndPoint(),
+ instance.getId(),
+ sinkNode.getPlanNodeId());
+ }
}
}
}
@@ -283,11 +235,4 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
return instanceMap.get(planNodeMap.get(exchangeNodeId).left);
}
-
- private void recordPlanNodeRelation(PlanNode root, PlanFragmentId
planFragmentId) {
- planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root));
- for (PlanNode child : root.getChildren()) {
- recordPlanNodeRelation(child, planFragmentId);
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
index de946fb8c33..64d2aa70599 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
@@ -15,21 +15,44 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import java.util.List;
+
public class RelationalDistributionPlanner {
private final Analysis analysis;
private final LogicalQueryPlan logicalQueryPlan;
private final MPPQueryContext context;
- public RelationalDistributionPlanner(Analysis analysis, LogicalQueryPlan
logicalQueryPlan) {
+ public RelationalDistributionPlanner(
+ Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext
context) {
this.analysis = analysis;
this.logicalQueryPlan = logicalQueryPlan;
- this.context = null;
+ this.context = context;
}
- public DistributedQueryPlan planFragments() {
- return null;
+ public DistributedQueryPlan plan() {
+ List<PlanNode> distributedPlanNodeResult =
+ new ExchangeNodeGenerator()
+ .visitPlan(
+ logicalQueryPlan.getRootNode(),
+ new ExchangeNodeGenerator.DistributionPlanContext());
+
+ if (distributedPlanNodeResult.size() != 1) {
+ throw new IllegalStateException("root node must return only one");
+ }
+
+ SubPlan subPlan = new SubPlanGenerator().splitToSubPlan(logicalQueryPlan);
+ subPlan.getPlanFragment().setRoot(true);
+
+ List<FragmentInstance> fragmentInstances =
+ new FragmentInstanceGenerator(subPlan, analysis, context).plan();
+
+ return new DistributedQueryPlan(
+ logicalQueryPlan.getContext(), subPlan, subPlan.getPlanFragmentList(),
fragmentInstances);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SimplePlanRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SimplePlanRewriter.java
new file mode 100644
index 00000000000..9c5ac404c6b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SimplePlanRewriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+
+import java.util.Collections;
+import java.util.List;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+
+public class SimplePlanRewriter<C> extends PlanVisitor<List<PlanNode>, C> {
+
+ @Override
+ public List<PlanNode> visitPlan(PlanNode node, C context) {
+ if (node instanceof WritePlanNode) {
+ return Collections.singletonList(node);
+ }
+
+ List<List<PlanNode>> children =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .collect(toImmutableList());
+
+ PlanNode newNode = node.clone();
+ for (List<PlanNode> planNodes : children) {
+ planNodes.forEach(newNode::addChild);
+ }
+ return Collections.singletonList(newNode);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
new file mode 100644
index 00000000000..de160e61145
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
+
+import org.apache.commons.lang3.Validate;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Split SubPlan according to ExchangeNode. */
+public class SubPlanGenerator {
+
+ public SubPlan splitToSubPlan(LogicalQueryPlan logicalQueryPlan) {
+ QueryId queryId = logicalQueryPlan.getContext().getQueryId();
+ SubPlan rootSubPlan = createSubPlan(logicalQueryPlan.getRootNode(),
queryId);
+ Set<PlanNodeId> visitedSinkNode = new HashSet<>();
+ splitToSubPlan(logicalQueryPlan.getRootNode(), rootSubPlan,
visitedSinkNode, queryId);
+ return rootSubPlan;
+ }
+
+ private void splitToSubPlan(
+ PlanNode root, SubPlan subPlan, Set<PlanNodeId> visitedSinkNode, QueryId
queryId) {
+ if (root instanceof WritePlanNode) {
+ return;
+ }
+ if (root instanceof ExchangeNode) {
+ // We add a FragmentSinkNode for newly created PlanFragment
+ ExchangeNode exchangeNode = (ExchangeNode) root;
+ Validate.isTrue(
+ exchangeNode.getChild() instanceof MultiChildrenSinkNode,
+ "child of ExchangeNode must be MultiChildrenSinkNode");
+ MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode)
(exchangeNode.getChild());
+
+ // We cut off the subtree to make the ExchangeNode as the leaf node of
current PlanFragment
+ exchangeNode.cleanChildren();
+
+ // If the SinkNode hasn't visited, build the child SubPlan Tree
+ if (!visitedSinkNode.contains(sinkNode.getPlanNodeId())) {
+ visitedSinkNode.add(sinkNode.getPlanNodeId());
+ SubPlan childSubPlan = createSubPlan(sinkNode, queryId);
+ splitToSubPlan(sinkNode, childSubPlan, visitedSinkNode, queryId);
+ subPlan.addChild(childSubPlan);
+ }
+ return;
+ }
+ for (PlanNode child : root.getChildren()) {
+ splitToSubPlan(child, subPlan, visitedSinkNode, queryId);
+ }
+ }
+
+ private SubPlan createSubPlan(PlanNode root, QueryId queryId) {
+ PlanFragment fragment = new PlanFragment(queryId.genPlanFragmentId(),
root);
+ return new SubPlan(fragment);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index 978978b2088..6804c056cac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -10,6 +10,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.relational.sql.tree.Expression;
+import com.google.common.collect.ImmutableList;
+
import javax.annotation.Nullable;
import java.io.DataOutputStream;
@@ -65,7 +67,7 @@ public class TableScanNode extends PlanNode {
@Override
public List<PlanNode> getChildren() {
- return null;
+ return ImmutableList.of();
}
@Override
@@ -123,6 +125,18 @@ public class TableScanNode extends PlanNode {
return this.qualifiedTableName;
}
+ public List<DeviceEntry> getDeviceEntries() {
+ return this.deviceEntries;
+ }
+
+ public void setDeviceEntries(List<DeviceEntry> deviceEntries) {
+ this.deviceEntries = deviceEntries;
+ }
+
+ public Map<Symbol, Integer> getIdAndAttributeIndexMap() {
+ return this.idAndAttributeIndexMap;
+ }
+
public Map<Symbol, ColumnSchema> getAssignments() {
return this.assignments;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
index 21a54da4e73..c52131813c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
@@ -15,13 +15,102 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.relational.sql.tree.Expression;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE;
+
+/** Extract IDeviceID and */
public class IndexScan implements RelationalPlanOptimizer {
@Override
- public PlanNode optimize(PlanNode planNode, Analysis analysis,
MPPQueryContext context) {
- return null;
+ public PlanNode optimize(
+ PlanNode planNode,
+ Analysis analysis,
+ Metadata metadata,
+ SessionInfo sessionInfo,
+ MPPQueryContext context) {
+ return planNode.accept(new Rewriter(), new RewriterContext(null, metadata,
sessionInfo));
+ }
+
+ private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext>
{
+ @Override
+ public PlanNode visitPlan(PlanNode node, RewriterContext context) {
+ for (PlanNode child : node.getChildren()) {
+ child.accept(this, context);
+ }
+ return node;
+ }
+
+ @Override
+ public PlanNode visitFilter(FilterNode node, RewriterContext context) {
+ context.setPredicate(node.getPredicate());
+ return node;
+ }
+
+ @Override
+ public PlanNode visitTableScan(TableScanNode node, RewriterContext
context) {
+ List<String> attributeColumns =
+ node.getAssignments().entrySet().stream()
+ .filter(e -> e.getValue().getColumnCategory().equals(ATTRIBUTE))
+ .map(e -> e.getKey().getName())
+ .collect(Collectors.toList());
+ // TODO extract predicate to expression list
+ List<DeviceEntry> deviceEntries =
+ context
+ .getMetadata()
+ .indexScan(
+ new QualifiedObjectName(
+ context.getSessionInfo().getDatabaseName().get(),
+ node.getQualifiedTableName()),
+ Collections.singletonList(context.getPredicate()),
+ attributeColumns);
+ node.setDeviceEntries(deviceEntries);
+ return node;
+ }
+ }
+
+ private static class RewriterContext {
+ private Expression predicate;
+ private Metadata metadata;
+ private final SessionInfo sessionInfo;
+
+ RewriterContext(Expression predicate, Metadata metadata, SessionInfo
sessionInfo) {
+ this.predicate = predicate;
+ this.metadata = metadata;
+ this.sessionInfo = sessionInfo;
+ }
+
+ public Expression getPredicate() {
+ return this.predicate;
+ }
+
+ public void setPredicate(Expression predicate) {
+ this.predicate = predicate;
+ }
+
+ public Metadata getMetadata() {
+ return this.metadata;
+ }
+
+ public void setMetadata(Metadata metadata) {
+ this.metadata = metadata;
+ }
+
+ public SessionInfo getSessionInfo() {
+ return this.sessionInfo;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
similarity index 50%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
index 1e5839fe32a..dc75af69b26 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
@@ -15,26 +15,36 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
-import org.apache.iotdb.db.relational.sql.tree.Expression;
+import org.apache.iotdb.db.relational.sql.tree.DefaultTraversalVisitor;
+import org.apache.iotdb.db.relational.sql.tree.SymbolReference;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExtractCommonPredicatesExpressionRewriter.extractCommonPredicates;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.NormalizeOrExpressionRewriter.normalizeOrExpression;
+import com.google.common.collect.ImmutableList;
-public class SimplifyExpressions implements RelationalPlanOptimizer {
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+public class PruneTableScanColumns implements RelationalPlanOptimizer {
@Override
- public PlanNode optimize(PlanNode planNode, Analysis analysis,
MPPQueryContext context) {
- // TODO add query statement pruning
+ public PlanNode optimize(
+ PlanNode planNode,
+ Analysis analysis,
+ Metadata metadata,
+ SessionInfo sessionInfo,
+ MPPQueryContext context) {
return planNode.accept(new Rewriter(), new RewriterContext());
}
private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext>
{
-
@Override
public PlanNode visitPlan(PlanNode node, RewriterContext context) {
for (PlanNode child : node.getChildren()) {
@@ -43,11 +53,20 @@ public class SimplifyExpressions implements
RelationalPlanOptimizer {
return node;
}
+ @Override
+ public PlanNode visitProject(ProjectNode node, RewriterContext context) {
+ context.symbolHashSet.addAll(node.getOutputSymbols());
+ node.getChild().accept(this, context);
+ return node;
+ }
+
@Override
public PlanNode visitFilter(FilterNode node, RewriterContext context) {
- Expression predicate = normalizeOrExpression(node.getPredicate());
- predicate = extractCommonPredicates(predicate);
- node.setPredicate(predicate);
+ ImmutableList.Builder<Symbol> symbolBuilder = ImmutableList.builder();
+ new SymbolBuilderVisitor().process(node.getPredicate(),
ImmutableList.builder());
+ List<Symbol> ret = symbolBuilder.build();
+ context.symbolHashSet.addAll(ret);
+ node.getChild().accept(this, context);
return node;
}
@@ -57,5 +76,17 @@ public class SimplifyExpressions implements
RelationalPlanOptimizer {
}
}
- private static class RewriterContext {}
+ private static class SymbolBuilderVisitor
+ extends DefaultTraversalVisitor<ImmutableList.Builder<Symbol>> {
+ @Override
+ protected Void visitSymbolReference(
+ SymbolReference node, ImmutableList.Builder<Symbol> builder) {
+ builder.add(Symbol.from(node));
+ return null;
+ }
+ }
+
+ private static class RewriterContext {
+ Set<Symbol> symbolHashSet = new HashSet<>();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
index df7cb945167..37611e6a35c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
@@ -15,9 +15,16 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
public interface RelationalPlanOptimizer {
- PlanNode optimize(PlanNode planNode, Analysis analysis, MPPQueryContext
context);
+ PlanNode optimize(
+ PlanNode planNode,
+ Analysis analysis,
+ Metadata metadata,
+ SessionInfo sessionInfo,
+ MPPQueryContext context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
index 34da737fd93..67027079a17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
@@ -15,10 +15,12 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
@@ -28,7 +30,12 @@ import java.util.List;
public class RemoveRedundantIdentityProjections implements
RelationalPlanOptimizer {
@Override
- public PlanNode optimize(PlanNode planNode, Analysis analysis,
MPPQueryContext context) {
+ public PlanNode optimize(
+ PlanNode planNode,
+ Analysis analysis,
+ Metadata metadata,
+ SessionInfo sessionInfo,
+ MPPQueryContext context) {
return planNode.accept(new Rewriter(), new RewriterContext());
}
@@ -60,8 +67,9 @@ public class RemoveRedundantIdentityProjections implements
RelationalPlanOptimiz
}
}
}
- return projectNode.getChild();
+ return projectNode.getChild().accept(this, context);
} else {
+ projectNode.getChild().accept(this, context);
return projectNode;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
index 1e5839fe32a..782c9705136 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
@@ -15,9 +15,11 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.relational.sql.tree.Expression;
@@ -28,7 +30,12 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.Normali
public class SimplifyExpressions implements RelationalPlanOptimizer {
@Override
- public PlanNode optimize(PlanNode planNode, Analysis analysis,
MPPQueryContext context) {
+ public PlanNode optimize(
+ PlanNode planNode,
+ Analysis analysis,
+ Metadata metadata,
+ SessionInfo sessionInfo,
+ MPPQueryContext context) {
// TODO add query statement pruning
return planNode.accept(new Rewriter(), new RewriterContext());
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 34eca515305..8e562f742e6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectN
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableHandle;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.RelationalDistributionPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.relational.sql.tree.Statement;
@@ -145,8 +146,13 @@ public class AnalyzerTest {
WarningCollector warningCollector = WarningCollector.NOOP;
LogicalPlanner logicalPlanner =
new LogicalPlanner(context, metadata, sessionInfo, warningCollector);
- LogicalQueryPlan result = logicalPlanner.plan(actualAnalysis);
- System.out.println(result);
+ LogicalQueryPlan logicalQueryPlan = logicalPlanner.plan(actualAnalysis);
+ System.out.println(logicalQueryPlan);
+
+ RelationalDistributionPlanner distributionPlanner =
+ new RelationalDistributionPlanner(actualAnalysis, logicalQueryPlan,
context);
+ // DistributedQueryPlan distributedQueryPlan = distributionPlanner.plan();
+ // System.out.println(distributedQueryPlan);
}
public static Analysis analyzeSQL(String sql, Metadata metadata) {