This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch multi_stage_query_engine
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/multi_stage_query_engine by
this push:
new 5984af9 Add pinot-query-planner module (#8340)
5984af9 is described below
commit 5984af97d66413d2b50d26647828a8ec8bba6e67
Author: Rong Rong <[email protected]>
AuthorDate: Thu Mar 24 21:13:51 2022 -0700
Add pinot-query-planner module (#8340)
* add pinot-query-planner
- fix calcite upgrade compilation issue
- fix query compilation runtime after calcite 1.29 upgrade
- linter
* address diff comments and add more TODOs
Co-authored-by: Rong Rong <[email protected]>
---
.../pinot/common/config/provider/TableCache.java | 10 +
pinot-query-planner/pom.xml | 92 ++++
.../apache/calcite/jdbc/CalciteSchemaBuilder.java | 52 +++
.../org/apache/pinot/query/QueryEnvironment.java | 181 ++++++++
.../apache/pinot/query/catalog/PinotCatalog.java | 122 +++++
.../org/apache/pinot/query/catalog/PinotTable.java | 61 +++
.../apache/pinot/query/context/PlannerContext.java | 40 ++
.../query/parser/CalciteExpressionParser.java | 502 +++++++++++++++++++++
.../pinot/query/parser/CalciteSqlParser.java | 148 ++++++
.../org/apache/pinot/query/parser/ParserUtils.java | 63 +++
.../apache/pinot/query/parser/QueryRewriter.java | 46 ++
.../apache/pinot/query/planner/LogicalPlanner.java | 63 +++
.../org/apache/pinot/query/planner/QueryPlan.java | 60 +++
.../pinot/query/planner/RelToStageConverter.java | 71 +++
.../apache/pinot/query/planner/StageMetadata.java | 85 ++++
.../apache/pinot/query/planner/StagePlanner.java | 126 ++++++
.../query/planner/nodes/AbstractStageNode.java | 49 ++
.../apache/pinot/query/planner/nodes/CalcNode.java | 40 ++
.../apache/pinot/query/planner/nodes/JoinNode.java | 86 ++++
.../query/planner/nodes/MailboxReceiveNode.java | 54 +++
.../pinot/query/planner/nodes/MailboxSendNode.java | 55 +++
.../pinot/query/planner/nodes/StageNode.java | 40 ++
.../pinot/query/planner/nodes/TableScanNode.java | 57 +++
.../partitioning/FieldSelectionKeySelector.java | 39 ++
.../query/planner/partitioning/KeySelector.java | 37 ++
.../apache/pinot/query/routing/WorkerInstance.java | 51 +++
.../apache/pinot/query/routing/WorkerManager.java | 95 ++++
.../query/rules/PinotExchangeNodeInsertRule.java | 82 ++++
.../pinot/query/rules/PinotQueryRuleSets.java | 91 ++++
.../org/apache/pinot/query/type/TypeFactory.java | 84 ++++
.../org/apache/pinot/query/type/TypeSystem.java | 30 ++
.../org/apache/pinot/query/validate/Validator.java | 36 ++
.../apache/pinot/query/QueryEnvironmentTest.java | 122 +++++
.../pinot/query/QueryEnvironmentTestUtils.java | 131 ++++++
pom.xml | 6 +-
35 files changed, 2903 insertions(+), 4 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index f708d42..3871f3d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -206,6 +206,16 @@ public class TableCache implements PinotConfigProvider {
}
}
+ /**
+ * Return a map between lower-case table name and their canonicalized form.
Key-value pair are only different in
+ * case-sensitive environment.
+ *
+ * @return the table name map.
+ */
+ public Map<String, String> getTableNameMap() {
+ return _tableNameMap;
+ }
+
private void addTableConfigs(List<String> paths) {
// Subscribe data changes before reading the data to avoid missing changes
for (String path : paths) {
diff --git a/pinot-query-planner/pom.xml b/pinot-query-planner/pom.xml
new file mode 100644
index 0000000..8d1af64
--- /dev/null
+++ b/pinot-query-planner/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>pinot</artifactId>
+ <groupId>org.apache.pinot</groupId>
+ <version>0.10.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>pinot-query-planner</artifactId>
+ <name>Pinot Query Planner</name>
+ <url>https://pinot.apache.org/</url>
+
+ <properties>
+ <pinot.root>${basedir}/..</pinot.root>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- Pinot dependencies -->
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-common</artifactId>
+ </dependency>
+
+ <!-- Calcite dependencies -->
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>3.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>commons-compiler</artifactId>
+ <version>3.0.9</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java
b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java
new file mode 100644
index 0000000..ce3d1c9
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.jdbc;
+
+import org.apache.calcite.schema.Schema;
+
+
+/**
+ * This class is used to create a {@link CalciteSchema} with a given {@link
Schema} as the root.
+ *
+ * <p>This class resides in calcite.jdbc namespace because there's no complex
logic we have in terms of catalog-based
+ * schema construct. We instead create a {@link SimpleCalciteSchema} that's
package protected.
+ */
+public class CalciteSchemaBuilder {
+
+ private CalciteSchemaBuilder() {
+ // do not instantiate.
+ }
+
+ /**
+ * Creates a {@link CalciteSchema} with a given {@link Schema} as the root.
+ *
+ * <p>Calcite creates two layer of abstraction, the {@link CalciteSchema} is
use internally for planner and
+ * {@link Schema} is user-facing with overrides. In our case we don't have a
complex internal wrapper extension
+ * so we only reuse the package protected {@link SimpleCalciteSchema}.
+ *
+ * <p>If there's need to extend this feature for planner functionalities we
should create our own extension to the
+ * {@link CalciteSchema}.
+ *
+ * @param root schema to use as a root schema
+ * @return calcite schema with given schema as the root
+ */
+ public static CalciteSchema asRootSchema(Schema root) {
+ return new SimpleCalciteSchema(null, root, "");
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
new file mode 100644
index 0000000..215c005
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import java.util.Collection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.prepare.PlannerImpl;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.parser.CalciteSqlParser;
+import org.apache.pinot.query.planner.LogicalPlanner;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StagePlanner;
+import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.rules.PinotQueryRuleSets;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.validate.Validator;
+
+
+/**
+ * The {@code QueryEnvironment} contains the main entrypoint for query
planning.
+ *
+ * <p>It provide the higher level entry interface to convert a SQL string into
a {@link QueryPlan}.
+ */
+public class QueryEnvironment {
+ // Calcite configurations
+ private final FrameworkConfig _config;
+
+ // Calcite extension/plugins
+ private final CalciteSchema _rootSchema;
+ private final PlannerImpl _planner;
+ private final Prepare.CatalogReader _catalogReader;
+ private final RelDataTypeFactory _typeFactory;
+ private final RelOptPlanner _relOptPlanner;
+ private final SqlValidator _validator;
+
+ // Pinot extensions
+ private final Collection<RelOptRule> _logicalRuleSet;
+ private final WorkerManager _workerManager;
+
+ public QueryEnvironment(TypeFactory typeFactory, CalciteSchema rootSchema,
WorkerManager workerManager) {
+ _typeFactory = typeFactory;
+ _rootSchema = rootSchema;
+ _workerManager = workerManager;
+ _config = Frameworks.newConfigBuilder().traitDefs().build();
+
+ // Planner is not thread-safe. must be reset() after each use.
+ _planner = new PlannerImpl(_config);
+
+ // catalog
+ _catalogReader = new CalciteCatalogReader(_rootSchema,
_rootSchema.path(null), _typeFactory, null);
+ _validator = new Validator(SqlStdOperatorTable.instance(), _catalogReader,
_typeFactory);
+
+ // optimizer rules
+ _logicalRuleSet = PinotQueryRuleSets.LOGICAL_OPT_RULES;
+
+ // optimizer
+ HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
+ for (RelOptRule relOptRule : _logicalRuleSet) {
+ hepProgramBuilder.addRuleInstance(relOptRule);
+ }
+ _relOptPlanner = new LogicalPlanner(hepProgramBuilder.build(),
Contexts.EMPTY_CONTEXT);
+ }
+
+ /**
+ * Plan a SQL query.
+ *
+ * <p>Noted that since Calcite's {@link org.apache.calcite.tools.Planner} is
not threadsafe.
+ * Only one query query can be planned at a time. Afterwards planner needs
to be reset in order to clear the
+ * state for the next planning.
+ *
+ * <p>In order for faster planning, we pre-constructed all the planner
objects and use the plan-then-reset
+ * model. Thusn when using {@code QueryEnvironment#planQuery(String)},
caller should ensure that no concurrent
+ * plan execution occurs.
+ *
+ * TODO: follow benchmark and profile to measure whether it make sense for
the latency-concurrency trade-off
+ * between reusing plannerImpl vs. create a new planner for each query.
+ *
+ * @param sqlQuery SQL query string.
+ * @return a dispatchable query plan
+ */
+ public QueryPlan planQuery(String sqlQuery) {
+ PlannerContext plannerContext = new PlannerContext();
+ try {
+ SqlNode parsed = parse(sqlQuery, plannerContext);
+ SqlNode validated = validate(parsed);
+ RelRoot relation = toRelation(validated, plannerContext);
+ RelNode optimized = optimize(relation, plannerContext);
+ return toDispatchablePlan(optimized, plannerContext);
+ } catch (Exception e) {
+ throw new RuntimeException("Error composing query plan for: " +
sqlQuery, e);
+ } finally {
+ _planner.close();
+ _planner.reset();
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // steps
+ // --------------------------------------------------------------------------
+
+ protected SqlNode parse(String query, PlannerContext plannerContext)
+ throws Exception {
+ // 1. invoke CalciteSqlParser to parse out SqlNode;
+ return CalciteSqlParser.compile(query, plannerContext);
+ }
+
+ protected SqlNode validate(SqlNode parsed)
+ throws Exception {
+ // 2. validator to validate.
+ SqlNode validated = _validator.validate(parsed);
+ if (null == validated || !validated.getKind().belongsTo(SqlKind.QUERY)) {
+ throw new IllegalArgumentException(
+ String.format("unsupported SQL query, cannot validate out a valid
sql from:\n%s", parsed));
+ }
+ return validated;
+ }
+
+ protected RelRoot toRelation(SqlNode parsed, PlannerContext plannerContext) {
+ // 3. convert sqlNode to relNode.
+ RexBuilder rexBuilder = new RexBuilder(_typeFactory);
+ RelOptCluster cluster = RelOptCluster.create(_relOptPlanner, rexBuilder);
+ SqlToRelConverter sqlToRelConverter =
+ new SqlToRelConverter(_planner, _validator, _catalogReader, cluster,
StandardConvertletTable.INSTANCE,
+ SqlToRelConverter.config());
+ return sqlToRelConverter.convertQuery(parsed, false, true);
+ }
+
+ protected RelNode optimize(RelRoot relRoot, PlannerContext plannerContext) {
+ // 4. optimize relNode
+ // TODO: add support for traits, cost factory.
+ try {
+ _relOptPlanner.setRoot(relRoot.rel);
+ return _relOptPlanner.findBestExp();
+ } catch (Exception e) {
+ throw new UnsupportedOperationException(
+ "Cannot generate a valid execution plan for the given query: " +
RelOptUtil.toString(relRoot.rel), e);
+ }
+ }
+
+ protected QueryPlan toDispatchablePlan(RelNode relRoot, PlannerContext
plannerContext) {
+ // 5. construct a dispatchable query plan.
+ StagePlanner queryStagePlanner = new StagePlanner(plannerContext,
_workerManager);
+ return queryStagePlanner.makePlan(relRoot);
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
new file mode 100644
index 0000000..34673a2
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.catalog;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * Simple Catalog that only contains list of tables. Backed by {@link
TableCache}.
+ *
+ * <p>Catalog is needed for utilizing Apache Calcite's validator, which
requires a root schema to store the
+ * entire catalog. In Pinot, since we don't have nested sub-catalog concept,
we just return a flat list of schemas.
+ */
+public class PinotCatalog implements Schema {
+
+ private final TableCache _tableCache;
+
+ /**
+ * PinotCatalog needs have access to the actual {@link TableCache} object
because TableCache hosts the actual
+ * table available for query and processes table/segment metadata updates
when cluster status changes.
+ */
+ public PinotCatalog(TableCache tableCache) {
+ _tableCache = tableCache;
+ }
+
+ /**
+ * Acquire a table by its name.
+ * @param name name of the table.
+ * @return table object used by calcite planner.
+ */
+ @Override
+ public Table getTable(String name) {
+ String tableName = TableNameBuilder.extractRawTableName(name);
+ return new PinotTable(_tableCache.getSchema(tableName));
+ }
+
+ /**
+ * acquire a set of available table names.
+ * @return the set of table names at the time of query planning.
+ */
+ @Override
+ public Set<String> getTableNames() {
+ return _tableCache.getTableNameMap().keySet();
+ }
+
+ @Override
+ public RelProtoDataType getType(String name) {
+ return null;
+ }
+
+ @Override
+ public Set<String> getTypeNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Collection<Function> getFunctions(String name) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Set<String> getFunctionNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Schema getSubSchema(String name) {
+ return null;
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Expression getExpression(@Nullable SchemaPlus parentSchema, String
name) {
+ requireNonNull(parentSchema, "parentSchema");
+ return Schemas.subSchemaExpression(parentSchema, name, getClass());
+ }
+
+ @Override
+ public boolean isMutable() {
+ return false;
+ }
+
+ @Override
+ public Schema snapshot(SchemaVersion version) {
+ return this;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
new file mode 100644
index 0000000..23e6444
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.catalog;
+
+import com.clearspring.analytics.util.Preconditions;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Wrapper for pinot internal info for a table.
+ *
+ * <p>This construct is used to connect a Pinot table to Apache Calcite's
relational planner by providing a
+ * {@link RelDataType} of the table to the planner.
+ */
+public class PinotTable extends AbstractTable implements ScannableTable {
+ private Schema _schema;
+
+ public PinotTable(Schema schema) {
+ _schema = schema;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
+ Preconditions.checkState(relDataTypeFactory instanceof TypeFactory);
+ TypeFactory typeFactory = (TypeFactory) relDataTypeFactory;
+ return typeFactory.createRelDataTypeFromSchema(_schema);
+ }
+
+ @Override
+ public boolean isRolledUp(String s) {
+ return false;
+ }
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext dataContext) {
+ return null;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
new file mode 100644
index 0000000..1997d1f
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.context;
+
+import java.util.Map;
+
+
+/**
+ * PlannerContext is an object that holds all contextual information during
planning phase.
+ *
+ * TODO: currently the planner context is not used since we don't support
option or query rewrite. This construct is
+ * here as a placeholder for the parsed out options.
+ */
+public class PlannerContext {
+ private Map<String, String> _options;
+
+ public void setOptions(Map<String, String> options) {
+ _options = options;
+ }
+
+ public Map<String, String> getOptions() {
+ return _options;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java
new file mode 100644
index 0000000..fc75efb
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java
@@ -0,0 +1,502 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.parser;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.fun.SqlCase;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Calcite parser to convert SQL expressions into {@link Expression}.
+ *
+ * <p>This class is extracted from {@link
org.apache.pinot.sql.parsers.CalciteSqlParser}. It only contains the
+ * {@link Expression} related info, this is used for ingestion and query
rewrite.
+ */
+public class CalciteExpressionParser {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CalciteExpressionParser.class);
+
+ private CalciteExpressionParser() {
+ // do not instantiate.
+ }
+
+ private static List<Expression>
getAliasLeftExpressionsFromDistinctExpression(Function function) {
+ List<Expression> operands = function.getOperands();
+ List<Expression> expressions = new ArrayList<>(operands.size());
+ for (Expression operand : operands) {
+ if (isAsFunction(operand)) {
+ expressions.add(operand.getFunctionCall().getOperands().get(0));
+ } else {
+ expressions.add(operand);
+ }
+ }
+ return expressions;
+ }
+
+ public static boolean isAggregateExpression(Expression expression) {
+ Function functionCall = expression.getFunctionCall();
+ if (functionCall != null) {
+ String operator = functionCall.getOperator();
+ try {
+ AggregationFunctionType.getAggregationFunctionType(operator);
+ return true;
+ } catch (IllegalArgumentException e) {
+ }
+ if (functionCall.getOperandsSize() > 0) {
+ for (Expression operand : functionCall.getOperands()) {
+ if (isAggregateExpression(operand)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ public static boolean isAsFunction(Expression expression) {
+ return expression.getFunctionCall() != null &&
expression.getFunctionCall().getOperator().equalsIgnoreCase("AS");
+ }
+
+ /**
+ * Extract all the identifiers from given expressions.
+ *
+ * @param expressions
+ * @param excludeAs if true, ignores the right side identifier for AS
function.
+ * @return all the identifier names.
+ */
+ public static Set<String> extractIdentifiers(List<Expression> expressions,
boolean excludeAs) {
+ Set<String> identifiers = new HashSet<>();
+ for (Expression expression : expressions) {
+ if (expression.getIdentifier() != null) {
+ identifiers.add(expression.getIdentifier().getName());
+ } else if (expression.getFunctionCall() != null) {
+ if (excludeAs &&
expression.getFunctionCall().getOperator().equalsIgnoreCase("AS")) {
+ identifiers.addAll(
+
extractIdentifiers(Arrays.asList(expression.getFunctionCall().getOperands().get(0)),
true));
+ continue;
+ } else {
+
identifiers.addAll(extractIdentifiers(expression.getFunctionCall().getOperands(),
excludeAs));
+ }
+ }
+ }
+ return identifiers;
+ }
+
+ /**
+ * Compiles a String expression into {@link Expression}.
+ *
+ * @param expression String expression.
+ * @return {@link Expression} equivalent of the string.
+ *
+ * @throws SqlCompilationException if String is not a valid expression.
+ */
+ public static Expression compileToExpression(String expression) {
+ SqlParser sqlParser = SqlParser.create(expression,
ParserUtils.PARSER_CONFIG);
+ SqlNode sqlNode;
+ try {
+ sqlNode = sqlParser.parseExpression();
+ } catch (SqlParseException e) {
+ throw new SqlCompilationException("Caught exception while parsing
expression: " + expression, e);
+ }
+ return toExpression(sqlNode);
+ }
+
+ private static List<Expression> convertDistinctSelectList(SqlNodeList
selectList) {
+ List<Expression> selectExpr = new ArrayList<>();
+
selectExpr.add(convertDistinctAndSelectListToFunctionExpression(selectList));
+ return selectExpr;
+ }
+
+ private static List<Expression> convertSelectList(SqlNodeList selectList) {
+ List<Expression> selectExpr = new ArrayList<>();
+
+ final Iterator<SqlNode> iterator = selectList.iterator();
+ while (iterator.hasNext()) {
+ final SqlNode next = iterator.next();
+ selectExpr.add(toExpression(next));
+ }
+
+ return selectExpr;
+ }
+
+ private static List<Expression> convertOrderByList(SqlNodeList orderList) {
+ List<Expression> orderByExpr = new ArrayList<>();
+ final Iterator<SqlNode> iterator = orderList.iterator();
+ while (iterator.hasNext()) {
+ final SqlNode next = iterator.next();
+ orderByExpr.add(convertOrderBy(next));
+ }
+ return orderByExpr;
+ }
+
+ private static Expression convertOrderBy(SqlNode node) {
+ final SqlKind kind = node.getKind();
+ Expression expression;
+ switch (kind) {
+ case DESCENDING:
+ SqlBasicCall basicCall = (SqlBasicCall) node;
+ expression = RequestUtils.getFunctionExpression("DESC");
+
expression.getFunctionCall().addToOperands(toExpression(basicCall.getOperandList().get(0)));
+ break;
+ case IDENTIFIER:
+ default:
+ expression = RequestUtils.getFunctionExpression("ASC");
+ expression.getFunctionCall().addToOperands(toExpression(node));
+ break;
+ }
+ return expression;
+ }
+
+ /**
+ * DISTINCT is implemented as an aggregation function so need to take the
select list items
+ * and convert them into a single function expression for handing over to
execution engine
+ * either as a PinotQuery or BrokerRequest via conversion
+ * @param selectList select list items
+ * @return DISTINCT function expression
+ */
+ private static Expression
convertDistinctAndSelectListToFunctionExpression(SqlNodeList selectList) {
+ String functionName = AggregationFunctionType.DISTINCT.getName();
+ Expression functionExpression =
RequestUtils.getFunctionExpression(functionName);
+ for (SqlNode node : selectList) {
+ Expression columnExpression = toExpression(node);
+ if (columnExpression.getType() == ExpressionType.IDENTIFIER &&
columnExpression.getIdentifier().getName()
+ .equals("*")) {
+ throw new SqlCompilationException(
+ "Syntax error: Pinot currently does not support DISTINCT with *.
Please specify each column name after "
+ + "DISTINCT keyword");
+ } else if (columnExpression.getType() == ExpressionType.FUNCTION) {
+ Function functionCall = columnExpression.getFunctionCall();
+ String function = functionCall.getOperator();
+ if (AggregationFunctionType.isAggregationFunction(function)) {
+ throw new SqlCompilationException(
+ "Syntax error: Use of DISTINCT with aggregation functions is not
supported");
+ }
+ }
+ functionExpression.getFunctionCall().addToOperands(columnExpression);
+ }
+ return functionExpression;
+ }
+
+ private static Expression toExpression(SqlNode node) {
+ LOGGER.debug("Current processing SqlNode: {}, node.getKind(): {}", node,
node.getKind());
+ switch (node.getKind()) {
+ case IDENTIFIER:
+ if (((SqlIdentifier) node).isStar()) {
+ return RequestUtils.getIdentifierExpression("*");
+ }
+ if (((SqlIdentifier) node).isSimple()) {
+ return RequestUtils.getIdentifierExpression(((SqlIdentifier)
node).getSimple());
+ }
+ return RequestUtils.getIdentifierExpression(node.toString());
+ case LITERAL:
+ return RequestUtils.getLiteralExpression((SqlLiteral) node);
+ case AS:
+ SqlBasicCall asFuncSqlNode = (SqlBasicCall) node;
+ List<SqlNode> operands = asFuncSqlNode.getOperandList();
+ Expression leftExpr = toExpression(operands.get(0));
+ SqlNode aliasSqlNode = operands.get(1);
+ String aliasName;
+ switch (aliasSqlNode.getKind()) {
+ case IDENTIFIER:
+ aliasName = ((SqlIdentifier) aliasSqlNode).getSimple();
+ break;
+ case LITERAL:
+ aliasName = ((SqlLiteral) aliasSqlNode).toValue();
+ break;
+ default:
+ throw new SqlCompilationException("Unsupported Alias sql node - "
+ aliasSqlNode);
+ }
+ Expression rightExpr = RequestUtils.getIdentifierExpression(aliasName);
+ // Just return left identifier if both sides are the same identifier.
+ if (leftExpr.isSetIdentifier() && rightExpr.isSetIdentifier()) {
+ if
(leftExpr.getIdentifier().getName().equals(rightExpr.getIdentifier().getName()))
{
+ return leftExpr;
+ }
+ }
+ final Expression asFuncExpr =
RequestUtils.getFunctionExpression(SqlKind.AS.toString());
+ asFuncExpr.getFunctionCall().addToOperands(leftExpr);
+ asFuncExpr.getFunctionCall().addToOperands(rightExpr);
+ return asFuncExpr;
+ case CASE:
+ // CASE WHEN Statement is model as a function with variable length
parameters.
+ // Assume N is number of WHEN Statements, total number of parameters
is (2 * N + 1).
+ // - N: Convert each WHEN Statement into a function Expression;
+ // - N: Convert each THEN Statement into an Expression;
+ // - 1: Convert ELSE Statement into an Expression.
+ SqlCase caseSqlNode = (SqlCase) node;
+ SqlNodeList whenOperands = caseSqlNode.getWhenOperands();
+ SqlNodeList thenOperands = caseSqlNode.getThenOperands();
+ SqlNode elseOperand = caseSqlNode.getElseOperand();
+ Expression caseFuncExpr =
RequestUtils.getFunctionExpression(SqlKind.CASE.name());
+ for (SqlNode whenSqlNode : whenOperands.getList()) {
+ Expression whenExpression = toExpression(whenSqlNode);
+ if (isAggregateExpression(whenExpression)) {
+ throw new SqlCompilationException(
+ "Aggregation functions inside WHEN Clause is not supported - "
+ whenSqlNode);
+ }
+ caseFuncExpr.getFunctionCall().addToOperands(whenExpression);
+ }
+ for (SqlNode thenSqlNode : thenOperands.getList()) {
+ Expression thenExpression = toExpression(thenSqlNode);
+ if (isAggregateExpression(thenExpression)) {
+ throw new SqlCompilationException(
+ "Aggregation functions inside THEN Clause is not supported - "
+ thenSqlNode);
+ }
+ caseFuncExpr.getFunctionCall().addToOperands(thenExpression);
+ }
+ Expression elseExpression = toExpression(elseOperand);
+ if (isAggregateExpression(elseExpression)) {
+ throw new SqlCompilationException(
+ "Aggregation functions inside ELSE Clause is not supported - " +
elseExpression);
+ }
+ caseFuncExpr.getFunctionCall().addToOperands(elseExpression);
+ return caseFuncExpr;
+ default:
+ if (node instanceof SqlDataTypeSpec) {
+ // This is to handle expression like: CAST(col AS INT)
+ return RequestUtils.getLiteralExpression(((SqlDataTypeSpec)
node).getTypeName().getSimple());
+ } else {
+ return compileFunctionExpression((SqlBasicCall) node);
+ }
+ }
+ }
+
+ private static Expression compileFunctionExpression(SqlBasicCall
functionNode) {
+ SqlKind functionKind = functionNode.getKind();
+ String functionName;
+ switch (functionKind) {
+ case AND:
+ return compileAndExpression(functionNode);
+ case OR:
+ return compileOrExpression(functionNode);
+ case COUNT:
+ SqlLiteral functionQuantifier = functionNode.getFunctionQuantifier();
+ if (functionQuantifier != null &&
functionQuantifier.toValue().equalsIgnoreCase("DISTINCT")) {
+ functionName = AggregationFunctionType.DISTINCTCOUNT.name();
+ } else {
+ functionName = AggregationFunctionType.COUNT.name();
+ }
+ break;
+ case OTHER:
+ case OTHER_FUNCTION:
+ case DOT:
+ functionName = functionNode.getOperator().getName().toUpperCase();
+ if (functionName.equals("ITEM") || functionName.equals("DOT")) {
+ // Calcite parses path expression such as "data[0][1].a.b[0]" into a
chain of ITEM and/or DOT
+ // functions. Collapse this chain into an identifier.
+ StringBuffer path = new StringBuffer();
+ compilePathExpression(functionName, functionNode, path);
+ return RequestUtils.getIdentifierExpression(path.toString());
+ }
+ break;
+ default:
+ functionName = functionKind.name();
+ break;
+ }
+ // When there is no argument, set an empty list as the operands
+ List<SqlNode> childNodes = functionNode.getOperandList();
+ List<Expression> operands = new ArrayList<>(childNodes.size());
+ for (SqlNode childNode : childNodes) {
+ if (childNode instanceof SqlNodeList) {
+ for (SqlNode node : (SqlNodeList) childNode) {
+ operands.add(toExpression(node));
+ }
+ } else {
+ operands.add(toExpression(childNode));
+ }
+ }
+ validateFunction(functionName, operands);
+ Expression functionExpression =
RequestUtils.getFunctionExpression(functionName);
+ functionExpression.getFunctionCall().setOperands(operands);
+ return functionExpression;
+ }
+
+ /**
+ * Convert Calcite operator tree made up of ITEM and DOT functions to an
identifier. For example, the operator tree
+ * shown below will be converted to IDENTIFIER
"jsoncolumn.data[0][1].a.b[0]".
+ *
+ * ├── ITEM(jsoncolumn.data[0][1].a.b[0])
+ * ├── LITERAL (0)
+ * └── DOT (jsoncolumn.daa[0][1].a.b)
+ * ├── IDENTIFIER (b)
+ * └── DOT (jsoncolumn.data[0][1].a)
+ * ├── IDENTIFIER (a)
+ * └── ITEM (jsoncolumn.data[0][1])
+ * ├── LITERAL (1)
+ * └── ITEM (jsoncolumn.data[0])
+ * ├── LITERAL (1)
+ * └── IDENTIFIER (jsoncolumn.data)
+ *
+ * @param functionName Name of the function ("DOT" or "ITEM")
+ * @param functionNode Root node of the DOT and/or ITEM operator function
chain.
+ * @param path String representation of path represented by DOT and/or ITEM
function chain.
+ */
+ private static void compilePathExpression(String functionName, SqlBasicCall
functionNode, StringBuffer path) {
+ List<SqlNode> operands = functionNode.getOperandList();
+
+ // Compile first operand of the function (either an identifier or another
DOT and/or ITEM function).
+ SqlKind kind0 = operands.get(0).getKind();
+ if (kind0 == SqlKind.IDENTIFIER) {
+ path.append(operands.get(0).toString());
+ } else if (kind0 == SqlKind.DOT || kind0 == SqlKind.OTHER_FUNCTION) {
+ SqlBasicCall function0 = (SqlBasicCall) operands.get(0);
+ String name0 = function0.getOperator().getName();
+ if (name0.equals("ITEM") || name0.equals("DOT")) {
+ compilePathExpression(name0, function0, path);
+ } else {
+ throw new SqlCompilationException("SELECT list item has bad path
expression.");
+ }
+ } else {
+ throw new SqlCompilationException("SELECT list item has bad path
expression.");
+ }
+
+ // Compile second operand of the function (either an identifier or
literal).
+ SqlKind kind1 = operands.get(1).getKind();
+ if (kind1 == SqlKind.IDENTIFIER) {
+ path.append(".").append(((SqlIdentifier) operands.get(1)).getSimple());
+ } else if (kind1 == SqlKind.LITERAL) {
+ path.append("[").append(((SqlLiteral)
operands.get(1)).toValue()).append("]");
+ } else {
+ throw new SqlCompilationException("SELECT list item has bad path
expression.");
+ }
+ }
+
+ public static String canonicalize(String functionName) {
+ return StringUtils.remove(functionName, '_').toLowerCase();
+ }
+
+ public static boolean isSameFunction(String function1, String function2) {
+ return canonicalize(function1).equals(canonicalize(function2));
+ }
+
+ private static void validateFunction(String functionName, List<Expression>
operands) {
+ switch (canonicalize(functionName)) {
+ case "jsonextractscalar":
+ validateJsonExtractScalarFunction(operands);
+ break;
+ case "jsonextractkey":
+ validateJsonExtractKeyFunction(operands);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private static void validateJsonExtractScalarFunction(List<Expression>
operands) {
+ int numOperands = operands.size();
+
+ // Check that there are exactly 3 or 4 arguments
+ if (numOperands != 3 && numOperands != 4) {
+ throw new SqlCompilationException(
+ "Expect 3 or 4 arguments for transform function:
jsonExtractScalar(jsonFieldName, 'jsonPath', "
+ + "'resultsType', ['defaultValue'])");
+ }
+ if (!operands.get(1).isSetLiteral() || !operands.get(2).isSetLiteral() ||
(numOperands == 4 && !operands.get(3)
+ .isSetLiteral())) {
+ throw new SqlCompilationException(
+ "Expect the 2nd/3rd/4th argument of transform function:
jsonExtractScalar(jsonFieldName, 'jsonPath',"
+ + " 'resultsType', ['defaultValue']) to be a single-quoted
literal value.");
+ }
+ }
+
+ private static void validateJsonExtractKeyFunction(List<Expression>
operands) {
+ // Check that there are exactly 2 arguments
+ if (operands.size() != 2) {
+ throw new SqlCompilationException(
+ "Expect 2 arguments are required for transform function:
jsonExtractKey(jsonFieldName, 'jsonPath')");
+ }
+ if (!operands.get(1).isSetLiteral()) {
+ throw new SqlCompilationException(
+ "Expect the 2nd argument for transform function:
jsonExtractKey(jsonFieldName, 'jsonPath') to be a "
+ + "single-quoted literal value.");
+ }
+ }
+
+ /**
+ * Helper method to flatten the operands for the AND expression.
+ */
+ private static Expression compileAndExpression(SqlBasicCall andNode) {
+ List<Expression> operands = new ArrayList<>();
+ for (SqlNode childNode : andNode.getOperandList()) {
+ if (childNode.getKind() == SqlKind.AND) {
+ Expression childAndExpression = compileAndExpression((SqlBasicCall)
childNode);
+ operands.addAll(childAndExpression.getFunctionCall().getOperands());
+ } else {
+ operands.add(toExpression(childNode));
+ }
+ }
+ Expression andExpression =
RequestUtils.getFunctionExpression(SqlKind.AND.name());
+ andExpression.getFunctionCall().setOperands(operands);
+ return andExpression;
+ }
+
+ /**
+ * Helper method to flatten the operands for the OR expression.
+ */
+ private static Expression compileOrExpression(SqlBasicCall orNode) {
+ List<Expression> operands = new ArrayList<>();
+ for (SqlNode childNode : orNode.getOperandList()) {
+ if (childNode.getKind() == SqlKind.OR) {
+ Expression childAndExpression = compileOrExpression((SqlBasicCall)
childNode);
+ operands.addAll(childAndExpression.getFunctionCall().getOperands());
+ } else {
+ operands.add(toExpression(childNode));
+ }
+ }
+ Expression andExpression =
RequestUtils.getFunctionExpression(SqlKind.OR.name());
+ andExpression.getFunctionCall().setOperands(operands);
+ return andExpression;
+ }
+
+ public static boolean isLiteralOnlyExpression(Expression e) {
+ if (e.getType() == ExpressionType.LITERAL) {
+ return true;
+ }
+ if (e.getType() == ExpressionType.FUNCTION) {
+ Function functionCall = e.getFunctionCall();
+ if (functionCall.getOperator().equalsIgnoreCase(SqlKind.AS.toString())) {
+ return isLiteralOnlyExpression(functionCall.getOperands().get(0));
+ }
+ return false;
+ }
+ return false;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java
new file mode 100644
index 0000000..d67896f
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.parser;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provide API to parse a SQL string into Pinot query {@link
SqlNode}.
+ *
+ * <p>This class is extracted from {@link
org.apache.pinot.sql.parsers.CalciteSqlParser}. It contains the logic
+ * to parsed SQL into {@link SqlNode} and use {@link QueryRewriter} to rewrite
the query with Pinot specific
+ * contextual info.
+ */
+public class CalciteSqlParser {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CalciteSqlParser.class);
+
+ private CalciteSqlParser() {
+ // do not instantiate.
+ }
+
+ /**
+ * entrypoint for Sql Parser.
+ */
+ public static SqlNode compile(String sql, PlannerContext plannerContext)
+ throws SqlCompilationException {
+ // Extract OPTION statements from sql as Calcite Parser doesn't parse it.
+ // TODO: use parser syntax extension instead.
+ Map<String, String> options = parseOptions(extractOptionsFromSql(sql));
+ plannerContext.setOptions(options);
+ if (!options.isEmpty()) {
+ sql = removeOptionsFromSql(sql);
+ }
+ // Compile Sql without OPTION statements.
+ SqlNode parsed = parse(sql);
+
+ // query rewrite.
+ return QueryRewriter.rewrite(parsed, plannerContext);
+ }
+
+ // ==========================================================================
+ // Static utils to parse the SQL.
+ // ==========================================================================
+
+ private static Map<String, String> parseOptions(List<String>
optionsStatements) {
+ if (optionsStatements.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Map<String, String> options = new HashMap<>();
+ for (String optionsStatement : optionsStatements) {
+ for (String option : optionsStatement.split(",")) {
+ final String[] splits = option.split("=");
+ if (splits.length != 2) {
+ throw new SqlCompilationException("OPTION statement requires two
parts separated by '='");
+ }
+ options.put(splits[0].trim(), splits[1].trim());
+ }
+ }
+ return options;
+ }
+
+ private static SqlNode parse(String sql) {
+ SqlParser sqlParser = SqlParser.create(sql, ParserUtils.PARSER_CONFIG);
+ SqlNode sqlNode;
+ try {
+ sqlNode = sqlParser.parseQuery();
+ } catch (SqlParseException e) {
+ throw new SqlCompilationException("Caught exception while parsing query:
" + sql, e);
+ }
+
+ // This is a special rewrite,
+ // TODO: move it to planner later.
+ SqlSelect selectNode;
+ if (sqlNode instanceof SqlOrderBy) {
+ // Store order-by info into the select sql node
+ SqlOrderBy orderByNode = (SqlOrderBy) sqlNode;
+ selectNode = (SqlSelect) orderByNode.query;
+ selectNode.setOrderBy(orderByNode.orderList);
+ selectNode.setFetch(orderByNode.fetch);
+ selectNode.setOffset(orderByNode.offset);
+ } else {
+ selectNode = (SqlSelect) sqlNode;
+ }
+ return selectNode;
+ }
+
+ private static List<String> extractOptionsFromSql(String sql) {
+ List<String> results = new ArrayList<>();
+ Matcher matcher = ParserUtils.OPTIONS_REGEX_PATTEN.matcher(sql);
+ while (matcher.find()) {
+ results.add(matcher.group(1));
+ }
+ return results;
+ }
+
+ private static String removeOptionsFromSql(String sql) {
+ Matcher matcher = ParserUtils.OPTIONS_REGEX_PATTEN.matcher(sql);
+ return matcher.replaceAll("");
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java
new file mode 100644
index 0000000..5422382
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.parser;
+
+import java.util.regex.Pattern;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.babel.SqlBabelParserImpl;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+
+
+/**
+ * Utility provided to Calcite parser.
+ *
+ * <p>This class is extracted from {@link
org.apache.pinot.sql.parsers.CalciteSqlParser} for its static constructs.
+ */
+final class ParserUtils {
+ /** Lexical policy similar to MySQL with ANSI_QUOTES option enabled. (To be
+ * precise: MySQL on Windows; MySQL on Linux uses case-sensitive matching,
+ * like the Linux file system.) The case of identifiers is preserved whether
+ * or not they quoted; after which, identifiers are matched
+ * case-insensitively. Double quotes allow identifiers to contain
+ * non-alphanumeric characters. */
+ static final Lex PINOT_LEX = Lex.MYSQL_ANSI;
+
+ // BABEL is a very liberal conformance value that allows anything supported
by any dialect
+ static final SqlParser.Config PARSER_CONFIG =
+
SqlParser.configBuilder().setLex(PINOT_LEX).setConformance(SqlConformanceEnum.BABEL)
+ .setParserFactory(SqlBabelParserImpl.FACTORY).build();
+
+ // TODO: move this to use parser syntax extension.
+ // To Keep the backward compatibility with 'OPTION' Functionality in PQL,
which is used to
+ // provide more hints for query processing.
+ //
+ // PQL syntax is: `OPTION (<key> = <value>)`
+ //
+ // Multiple OPTIONs is also supported by:
+ // either
+ // `OPTION (<k1> = <v1>, <k2> = <v2>, <k3> = <v3>)`
+ // or
+ // `OPTION (<k1> = <v1>) OPTION (<k2> = <v2>) OPTION (<k3> = <v3>)`
+ static final Pattern OPTIONS_REGEX_PATTEN =
Pattern.compile("option\\s*\\(([^\\)]+)\\)", Pattern.CASE_INSENSITIVE);
+
+ private ParserUtils() {
+ // do not instantiate.
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/QueryRewriter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/QueryRewriter.java
new file mode 100644
index 0000000..8cb0060
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/QueryRewriter.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.parser;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.query.context.PlannerContext;
+
+/**
+ * Rewrites query based on user option as well as built-in rules.
+ */
+public class QueryRewriter {
+
+ private QueryRewriter() {
+ // do not instantiate.
+ }
+
+ /**
+ * Entrypoint to execute the query rewrite.
+ *
+ * It should be functionally identical to running {@link
org.apache.pinot.sql.parsers.rewriter.QueryRewriter}.
+ * But it operates on a {@link SqlNode} tree instead of a flat pinot query
object.
+ *
+ * @param sqlNodeRoot root of the sqlNode tree
+ * @param plannerContext planner context
+ * @return rewritten sqlNode.
+ */
+ public static SqlNode rewrite(SqlNode sqlNodeRoot, PlannerContext
plannerContext) {
+ return sqlNodeRoot;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java
new file mode 100644
index 0000000..9844916
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgram;
+
+
+/**
+ * The {@code LogicalPlanner} is an extended implementation of the Calcite's
{@link HepPlanner}.
+ */
+public class LogicalPlanner extends HepPlanner {
+
+ private List<RelTraitDef> _traitDefs;
+
+ public LogicalPlanner(HepProgram program, Context context) {
+ super(program, context);
+ _traitDefs = new ArrayList();
+ }
+
+ @Override
+ public boolean addRelTraitDef(RelTraitDef relTraitDef) {
+ return !_traitDefs.contains(relTraitDef) && _traitDefs.add(relTraitDef);
+ }
+
+ @Override
+ public List<RelTraitDef> getRelTraitDefs() {
+ return _traitDefs;
+ }
+
+ @Override
+ public RelTraitSet emptyTraitSet() {
+ RelTraitSet traitSet = super.emptyTraitSet();
+ for (RelTraitDef traitDef : _traitDefs) {
+ if (traitDef.multiple()) {
+ // not supported
+ }
+ traitSet = traitSet.plus(traitDef.getDefault());
+ }
+ return traitSet;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
new file mode 100644
index 0000000..bdaba4a
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.Map;
+import org.apache.pinot.query.planner.nodes.StageNode;
+
+
+/**
+ * The {@code QueryPlan} is the dispatchable query execution plan from the
result of {@link LogicalPlanner}.
+ *
+ * <p>QueryPlan should contain the necessary stage boundary information and
the cross exchange information
+ * for:
+ * <ul>
+ * <li>dispatch individual stages to executor.</li>
+ * <li>instruct stage executor to establish connection channels to other
stages.</li>
+ * <li>encode data blocks for transfer between stages based on partitioning
scheme.</li>
+ * </ul>
+ */
+public class QueryPlan {
+ private Map<String, StageNode> _queryStageMap;
+ private Map<String, StageMetadata> _stageMetadataMap;
+
+ public QueryPlan(Map<String, StageNode> queryStageMap, Map<String,
StageMetadata> stageMetadataMap) {
+ _queryStageMap = queryStageMap;
+ _stageMetadataMap = stageMetadataMap;
+ }
+
+ /**
+ * Get the map between stageID and the stage plan root node.
+ * @return stage plan map.
+ */
+ public Map<String, StageNode> getQueryStageMap() {
+ return _queryStageMap;
+ }
+
+ /**
+ * Get the stage metadata information.
+ * @return stage metadata info.
+ */
+ public Map<String, StageMetadata> getStageMetadataMap() {
+ return _stageMetadataMap;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
new file mode 100644
index 0000000..d167521
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.pinot.query.planner.nodes.CalcNode;
+import org.apache.pinot.query.planner.nodes.JoinNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.planner.nodes.TableScanNode;
+
+
+/**
+ * The {@code StageNodeConverter} converts a logical {@link RelNode} to a
{@link StageNode}.
+ */
+public final class RelToStageConverter {
+
+ private RelToStageConverter() {
+ // do not instantiate.
+ }
+
+ /**
+ * convert a normal relation node into stage node with just the expression
piece.
+ *
+ * TODO: we should convert this to a more structured pattern once we
determine the serialization format used.
+ *
+ * @param node relational node
+ * @return stage node.
+ */
+ public static StageNode toStageNode(RelNode node, String currentStageId) {
+ if (node instanceof LogicalCalc) {
+ return convertLogicalCal((LogicalCalc) node, currentStageId);
+ } else if (node instanceof LogicalTableScan) {
+ return convertLogicalTableScan((LogicalTableScan) node, currentStageId);
+ } else if (node instanceof LogicalJoin) {
+ return convertLogicalJoin((LogicalJoin) node, currentStageId);
+ } else {
+ throw new UnsupportedOperationException("Unsupported logical plan node:
" + node);
+ }
+ }
+
+ private static StageNode convertLogicalTableScan(LogicalTableScan node,
String currentStageId) {
+ return new TableScanNode(node, currentStageId);
+ }
+
+ private static StageNode convertLogicalCal(LogicalCalc node, String
currentStageId) {
+ return new CalcNode(node, currentStageId);
+ }
+
+ private static StageNode convertLogicalJoin(LogicalJoin node, String
currentStageId) {
+ return new JoinNode(node, currentStageId);
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
new file mode 100644
index 0000000..91eb5b7
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.planner.nodes.TableScanNode;
+
+
+/**
+ * The {@code StageMetadata} info contains the information for dispatching a
particular stage.
+ *
+ * <p>It contains information aboute:
+ * <ul>
+ * <li>the tables it is suppose to scan for</li>
+ * <li>the underlying segments a stage requires to execute upon.</li>
+ * <li>the server instances to which this stage should be execute on</li>
+ * </ul>
+ */
+public class StageMetadata implements Serializable {
+ private List<String> _scannedTables;
+
+ // used for assigning server/worker nodes.
+ private List<ServerInstance> _serverInstances;
+
+ // used for table scan stage.
+ private Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+
+ public StageMetadata() {
+ _scannedTables = new ArrayList<>();
+ _serverInstances = new ArrayList<>();
+ _serverInstanceToSegmentsMap = new HashMap<>();
+ }
+
+ public void attach(StageNode stageNode) {
+ if (stageNode instanceof TableScanNode) {
+ _scannedTables.add(((TableScanNode) stageNode).getTableName().get(0));
+ }
+ }
+
+ public List<String> getScannedTables() {
+ return _scannedTables;
+ }
+
+ // -----------------------------------------------
+ // attached physical plan context.
+ // -----------------------------------------------
+
+ public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() {
+ return _serverInstanceToSegmentsMap;
+ }
+
+ public void setServerInstanceToSegmentsMap(Map<ServerInstance, List<String>>
serverInstanceToSegmentsMap) {
+ _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
+ }
+
+ public List<ServerInstance> getServerInstances() {
+ return _serverInstances;
+ }
+
+ public void setServerInstances(List<ServerInstance> serverInstances) {
+ _serverInstances = serverInstances;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
new file mode 100644
index 0000000..175e77f
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.planner.nodes.MailboxReceiveNode;
+import org.apache.pinot.query.planner.nodes.MailboxSendNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.routing.WorkerManager;
+
+
+/**
+ * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest
of trees with {@link StageNode}.
+ *
+ * This class is non-threadsafe. Do not reuse the stage planner for multiple
query plans.
+ */
+public class StagePlanner {
+ private final PlannerContext _plannerContext;
+ private final WorkerManager _workerManager;
+
+ private Map<String, StageNode> _queryStageMap;
+ private Map<String, StageMetadata> _stageMetadataMap;
+ private int _stageIdCounter;
+
+ public StagePlanner(PlannerContext plannerContext, WorkerManager
workerManager) {
+ _plannerContext = plannerContext;
+ _workerManager = workerManager;
+ }
+
+ /**
+ * Construct the dispatchable plan from relational logical plan.
+ *
+ * @param relRoot relational plan root.
+ * @return dispatchable plan.
+ */
+ public QueryPlan makePlan(RelNode relRoot) {
+ // clear the state
+ _queryStageMap = new HashMap<>();
+ _stageMetadataMap = new HashMap<>();
+ _stageIdCounter = 0;
+
+ // walk the plan and create stages.
+ StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
+
+ // global root needs to send results back to the ROOT, a.k.a. the client
response node.
+ // the last stage is always a broadcast-gather.
+ StageNode globalReceiverNode =
+ new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(),
RelDistribution.Type.BROADCAST_DISTRIBUTED);
+ StageNode globalSenderNode = new MailboxSendNode(globalStageRoot,
globalReceiverNode.getStageId(),
+ RelDistribution.Type.BROADCAST_DISTRIBUTED);
+ _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
+ StageMetadata stageMetadata =
_stageMetadataMap.get(globalSenderNode.getStageId());
+ stageMetadata.attach(globalSenderNode);
+
+ _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode);
+ StageMetadata globalReceivingStageMetadata = new StageMetadata();
+ globalReceivingStageMetadata.attach(globalReceiverNode);
+ _stageMetadataMap.put(globalReceiverNode.getStageId(),
globalReceivingStageMetadata);
+
+ // assign workers to each stage.
+ for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) {
+ _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
+ }
+
+ return new QueryPlan(_queryStageMap, _stageMetadataMap);
+ }
+
+ // non-threadsafe
+ private StageNode walkRelPlan(RelNode node, String currentStageId) {
+ if (isExchangeNode(node)) {
+ // 1. exchangeNode always have only one input, get its input converted
as a new stage root.
+ StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
+ RelDistribution.Type exchangeType = ((LogicalExchange)
node).distribution.getType();
+
+ // 2. make an exchange sender and receiver node pair
+ StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId,
nextStageRoot.getStageId(), exchangeType);
+ StageNode mailboxSender = new MailboxSendNode(nextStageRoot,
mailboxReceiver.getStageId(), exchangeType);
+
+ // 3. put the sender side as a completed stage.
+ _queryStageMap.put(mailboxSender.getStageId(), mailboxSender);
+
+ // 4. return the receiver (this is considered as a "virtual table scan"
node for its parent.
+ return mailboxReceiver;
+ } else {
+ StageNode stageNode = RelToStageConverter.toStageNode(node,
currentStageId);
+ List<RelNode> inputs = node.getInputs();
+ for (RelNode input : inputs) {
+ stageNode.addInput(walkRelPlan(input, currentStageId));
+ }
+ StageMetadata stageMetadata =
_stageMetadataMap.computeIfAbsent(currentStageId, (id) -> new StageMetadata());
+ stageMetadata.attach(stageNode);
+ return stageNode;
+ }
+ }
+
+ private boolean isExchangeNode(RelNode node) {
+ return (node instanceof LogicalExchange);
+ }
+
+ private String getNewStageId() {
+ return String.valueOf(_stageIdCounter++);
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
new file mode 100644
index 0000000..71701df
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public abstract class AbstractStageNode implements StageNode {
+
+ protected final String _stageId;
+ protected final List<StageNode> _inputs;
+
+ public AbstractStageNode(String stageId) {
+ _stageId = stageId;
+ _inputs = new ArrayList<>();
+ }
+
+ @Override
+ public List<StageNode> getInputs() {
+ return _inputs;
+ }
+
+ @Override
+ public void addInput(StageNode stageNode) {
+ _inputs.add(stageNode);
+ }
+
+ @Override
+ public String getStageId() {
+ return _stageId;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
new file mode 100644
index 0000000..4b4ca91
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import org.apache.calcite.rel.logical.LogicalCalc;
+
+
+public class CalcNode extends AbstractStageNode {
+ private final String _expression;
+
+ public CalcNode(LogicalCalc node, String currentStageId) {
+ super(currentStageId);
+ _expression = toExpression(node);
+ }
+
+ public String getExpression() {
+ return _expression;
+ }
+
+ private String toExpression(LogicalCalc node) {
+ // TODO: make it real.
+ return node.getDigest();
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
new file mode 100644
index 0000000..520af69
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+
+
+public class JoinNode extends AbstractStageNode {
+ private final JoinRelType _joinType;
+ private final int _leftOperandIndex;
+ private final int _rightOperandIndex;
+ private final FieldSelectionKeySelector _leftFieldSelectionKeySelector;
+ private final FieldSelectionKeySelector _rightFieldSelectionKeySelector;
+
+ private transient final RelDataType _leftRowType;
+ private transient final RelDataType _rightRowType;
+
+ public JoinNode(LogicalJoin node, String currentStageId) {
+ super(currentStageId);
+ _joinType = node.getJoinType();
+ RexCall joinCondition = (RexCall) node.getCondition();
+ Preconditions.checkState(
+ joinCondition.getOperator().getKind().equals(SqlKind.EQUALS) &&
joinCondition.getOperands().size() == 2,
+ "only equality JOIN is supported");
+ Preconditions.checkState(joinCondition.getOperands().get(0) instanceof
RexInputRef, "only reference supported");
+ Preconditions.checkState(joinCondition.getOperands().get(1) instanceof
RexInputRef, "only reference supported");
+ _leftRowType = node.getLeft().getRowType();
+ _rightRowType = node.getRight().getRowType();
+ _leftOperandIndex = ((RexInputRef)
joinCondition.getOperands().get(0)).getIndex();
+ _rightOperandIndex = ((RexInputRef)
joinCondition.getOperands().get(1)).getIndex();
+ _leftFieldSelectionKeySelector = new
FieldSelectionKeySelector(_leftOperandIndex);
+ _rightFieldSelectionKeySelector =
+ new FieldSelectionKeySelector(_rightOperandIndex -
_leftRowType.getFieldNames().size());
+ }
+
+ public JoinRelType getJoinType() {
+ return _joinType;
+ }
+
+ public RelDataType getLeftRowType() {
+ return _leftRowType;
+ }
+
+ public RelDataType getRightRowType() {
+ return _rightRowType;
+ }
+
+ public int getLeftOperandIndex() {
+ return _leftOperandIndex;
+ }
+
+ public int getRightOperandIndex() {
+ return _rightOperandIndex;
+ }
+
+ public FieldSelectionKeySelector getLeftJoinKeySelector() {
+ return _leftFieldSelectionKeySelector;
+ }
+
+ public FieldSelectionKeySelector getRightJoinKeySelector() {
+ return _rightFieldSelectionKeySelector;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
new file mode 100644
index 0000000..947d449
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+
+
+public class MailboxReceiveNode extends AbstractStageNode {
+
+ private final String _senderStageId;
+ private final RelDistribution.Type _exchangeType;
+
+ public MailboxReceiveNode(String stageId, String senderStageId,
RelDistribution.Type exchangeType) {
+ super(stageId);
+ _senderStageId = senderStageId;
+ _exchangeType = exchangeType;
+ }
+
+ @Override
+ public List<StageNode> getInputs() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void addInput(StageNode stageNode) {
+ throw new UnsupportedOperationException("no input should be added to
mailbox receive.");
+ }
+
+ public String getSenderStageId() {
+ return _senderStageId;
+ }
+
+ public RelDistribution.Type getExchangeType() {
+ return _exchangeType;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
new file mode 100644
index 0000000..6db9578
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+
+
+public class MailboxSendNode extends AbstractStageNode {
+ private final StageNode _stageRoot;
+ private final String _receiverStageId;
+ private final RelDistribution.Type _exchangeType;
+
+ public MailboxSendNode(StageNode stageRoot, String receiverStageId,
RelDistribution.Type exchangeType) {
+ super(stageRoot.getStageId());
+ _stageRoot = stageRoot;
+ _receiverStageId = receiverStageId;
+ _exchangeType = exchangeType;
+ }
+
+ @Override
+ public List<StageNode> getInputs() {
+ return Collections.singletonList(_stageRoot);
+ }
+
+ @Override
+ public void addInput(StageNode queryStageRoot) {
+ throw new UnsupportedOperationException("mailbox cannot be changed!");
+ }
+
+ public String getReceiverStageId() {
+ return _receiverStageId;
+ }
+
+ public RelDistribution.Type getExchangeType() {
+ return _exchangeType;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
new file mode 100644
index 0000000..8fcbb5e
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Stage Node is a serializable version of the {@link
org.apache.calcite.rel.RelNode}.
+ *
+ * TODO: stage node currently uses java.io.Serializable as its serialization
format.
+ * We should experiment with other type of serialization format for better
performance.
+ * Essentially what we need is a way to exclude the planner context from the
RelNode but only keeps the
+ * constructed relational content because we will no longer revisit the
planner after stage is created.
+ */
+public interface StageNode extends Serializable {
+
+ List<StageNode> getInputs();
+
+ void addInput(StageNode stageNode);
+
+ String getStageId();
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
new file mode 100644
index 0000000..0bb6e0f
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+
+public class TableScanNode extends AbstractStageNode {
+ private final List<String> _tableName;
+ private final List<String> _tableScanColumns;
+
+ public TableScanNode(LogicalTableScan tableScan, String stageId) {
+ super(stageId);
+ _tableName = tableScan.getTable().getQualifiedName();
+ // TODO: optimize this, table field is not directly usable as name.
+ _tableScanColumns =
+
tableScan.getRowType().getFieldList().stream().map(RelDataTypeField::getName).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<StageNode> getInputs() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void addInput(StageNode queryStageRoot) {
+ throw new UnsupportedOperationException("TableScanNode cannot add input as
it is a leaf node");
+ }
+
+ public List<String> getTableName() {
+ return _tableName;
+ }
+
+ public List<String> getTableScanColumns() {
+ return _tableScanColumns;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
new file mode 100644
index 0000000..9e0c776
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+import java.io.Serializable;
+
+
+/**
+ * The {@code FieldSelectionKeySelector} simply extract a column value out
from a row array {@link Object[]}.
+ */
+public class FieldSelectionKeySelector implements KeySelector<Object[],
Object>, Serializable {
+
+ private int _columnIndex;
+
+ public FieldSelectionKeySelector(int columnIndex) {
+ _columnIndex = columnIndex;
+ }
+
+ @Override
+ public Object getKey(Object[] input) {
+ return input[_columnIndex];
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
new file mode 100644
index 0000000..79dc987
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+/**
+ * The {@code KeySelector} provides a partitioning function to encode a
specific input data type into a key.
+ *
+ * <p>This key selector is used for computation such as GROUP BY or equality
JOINs.
+ *
+ * <p>Key selector should always produce the same selection hash key when the
same input is provided.
+ */
+public interface KeySelector<IN, OUT> {
+
+ /**
+ * Extract the key out of an input data construct.
+ *
+ * @param input input data.
+ * @return the key of the input data.
+ */
+ OUT getKey(IN input);
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
new file mode 100644
index 0000000..7ad6ee0
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing;
+
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * WorkerInstance is a wrapper around {@link ServerInstance}.
+ *
+ * <p>This can be considered as a simplified version which directly enable
host-port initialization.
+ */
+public class WorkerInstance extends ServerInstance {
+
+ public WorkerInstance(InstanceConfig instanceConfig) {
+ super(instanceConfig);
+ }
+
+ public WorkerInstance(String hostname, int serverPort, int mailboxPort) {
+ super(toInstanceConfig(hostname, serverPort, mailboxPort));
+ }
+
+ private static InstanceConfig toInstanceConfig(String hostname, int
serverPort, int mailboxPort) {
+ String server = String.format("%s_%d", hostname, serverPort);
+ InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(server);
+ ZNRecord znRecord = instanceConfig.getRecord();
+ Map<String, String> simpleFields = znRecord.getSimpleFields();
+ simpleFields.put(CommonConstants.Helix.Instance.GRPC_PORT_KEY,
String.valueOf(mailboxPort));
+ return instanceConfig;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
new file mode 100644
index 0000000..a310258
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+
+
+/**
+ * The {@code WorkerManager} manages stage to worker assignment.
+ *
+ * <p>It contains the logic to assign worker to a particular stages. If it is
a leaf stage the logic fallback to
+ * how Pinot server assigned server and server-segment mapping.
+ *
+ * TODO: Currently it is implemented by wrapping routing manager from Pinot
Broker. however we can abstract out
+ * the worker manager later when we split out the query-spi layer.
+ */
+public class WorkerManager {
+ private static final CalciteSqlCompiler CALCITE_SQL_COMPILER = new
CalciteSqlCompiler();
+
+ private final String _hostName;
+ private final int _port;
+ private final RoutingManager _routingManager;
+
+ public WorkerManager(String hostName, int port, RoutingManager
routingManager) {
+ _hostName = hostName;
+ _port = port;
+ _routingManager = routingManager;
+ }
+
+ public void assignWorkerToStage(String stageId, StageMetadata stageMetadata)
{
+ List<String> scannedTables = stageMetadata.getScannedTables();
+ if (scannedTables.size() == 1) { // table scan stage, need to attach
server as well as segment info.
+ RoutingTable routingTable = getRoutingTable(scannedTables.get(0));
+ Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
routingTable.getServerInstanceToSegmentsMap();
+ stageMetadata.setServerInstances(new
ArrayList<>(serverInstanceToSegmentsMap.keySet()));
+ stageMetadata.setServerInstanceToSegmentsMap(new
HashMap<>(serverInstanceToSegmentsMap));
+ } else if (stageId.equalsIgnoreCase("ROOT")) {
+ // ROOT stage doesn't have a QueryServer as it is strictly only reducing
results.
+ // here we simply assign the worker instance with identical
server/mailbox port number.
+ stageMetadata.setServerInstances(Lists.newArrayList(new
WorkerInstance(_hostName, _port, _port)));
+ } else {
+
stageMetadata.setServerInstances(filterServers(_routingManager.getEnabledServerInstanceMap().values()));
+ }
+ }
+
+ private static List<ServerInstance> filterServers(Collection<ServerInstance>
servers) {
+ List<ServerInstance> serverInstances = new ArrayList<>();
+ for (ServerInstance server : servers) {
+ String hostname = server.getHostname();
+ if
(!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) &&
!hostname.startsWith(
+ CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE) &&
!hostname.startsWith(
+ CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE) &&
server.getGrpcPort() > 0) {
+ serverInstances.add(server);
+ }
+ }
+ return serverInstances;
+ }
+
+ private RoutingTable getRoutingTable(String tableName) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ // TODO: support both offline and realtime, now we hard code offline table.
+ String tableNameWithType =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+ return
_routingManager.getRoutingTable(CALCITE_SQL_COMPILER.compileToBrokerRequest(
+ "SELECT * FROM " + tableNameWithType));
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
new file mode 100644
index 0000000..2b35613
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.rules;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * Special rule for Pinot, always insert exchange after JOIN
+ */
+public class PinotExchangeNodeInsertRule extends RelOptRule {
+ public static final PinotExchangeNodeInsertRule INSTANCE =
+ new PinotExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
+
+ public PinotExchangeNodeInsertRule(RelBuilderFactory factory) {
+ super(operand(LogicalJoin.class, any()), factory, null);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ if (call.rels.length < 1) {
+ return false;
+ }
+ if (call.rel(0) instanceof Join) {
+ Join join = call.rel(0);
+ return !isExchange(join.getLeft()) && !isExchange(join.getRight());
+ }
+ return false;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Join join = call.rel(0);
+ RelNode leftInput = join.getInput(0);
+ RelNode rightInput = join.getInput(1);
+
+ RelNode leftExchange = LogicalExchange.create(leftInput,
RelDistributions.SINGLETON);
+ RelNode rightExchange = LogicalExchange.create(rightInput,
RelDistributions.BROADCAST_DISTRIBUTED);
+
+ RelNode newJoinNode =
+ new LogicalJoin(join.getCluster(), join.getTraitSet(), leftExchange,
rightExchange, join.getCondition(),
+ join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
+ ImmutableList.copyOf(join.getSystemFieldList()));
+
+ call.transformTo(newJoinNode);
+ }
+
+ private static boolean isExchange(RelNode rel) {
+ RelNode reference = rel;
+ if (reference instanceof HepRelVertex) {
+ reference = ((HepRelVertex) reference).getCurrentRel();
+ }
+ return reference instanceof Exchange;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
new file mode 100644
index 0000000..1b4e085
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.rules;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rel.rules.PruneEmptyRules;
+
+
+/**
+ * Default rule sets for Pinot query
+ */
+public class PinotQueryRuleSets {
+ private PinotQueryRuleSets() {
+ // do not instantiate.
+ }
+
+ public static final Collection<RelOptRule> LOGICAL_OPT_RULES =
+ Arrays.asList(EnumerableRules.ENUMERABLE_FILTER_RULE,
EnumerableRules.ENUMERABLE_JOIN_RULE,
+ EnumerableRules.ENUMERABLE_PROJECT_RULE,
EnumerableRules.ENUMERABLE_SORT_RULE,
+ EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
+
+ // push a filter into a join
+ CoreRules.FILTER_INTO_JOIN,
+ // push filter through an aggregation
+ CoreRules.FILTER_AGGREGATE_TRANSPOSE,
+ // push filter through set operation
+ CoreRules.FILTER_SET_OP_TRANSPOSE,
+ // push project through set operation
+ CoreRules.PROJECT_SET_OP_TRANSPOSE,
+
+ // aggregation and projection rules
+ CoreRules.AGGREGATE_PROJECT_MERGE,
CoreRules.AGGREGATE_PROJECT_PULL_UP_CONSTANTS,
+ // push a projection past a filter or vice versa
+ CoreRules.PROJECT_FILTER_TRANSPOSE,
CoreRules.FILTER_PROJECT_TRANSPOSE,
+ // push a projection to the children of a join
+ // push all expressions to handle the time indicator correctly
+ CoreRules.JOIN_CONDITION_PUSH,
+ // merge projections
+ CoreRules.PROJECT_MERGE,
+ // remove identity project
+ CoreRules.PROJECT_REMOVE,
+ // reorder sort and projection
+ CoreRules.SORT_PROJECT_TRANSPOSE,
+
+ // join rules
+ CoreRules.JOIN_PUSH_EXPRESSIONS,
+
+ // convert non-all union into all-union + distinct
+ CoreRules.UNION_TO_DISTINCT,
+
+ // remove aggregation if it does not aggregate and input is already
distinct
+ CoreRules.AGGREGATE_REMOVE,
+ // push aggregate through join
+ CoreRules.AGGREGATE_JOIN_TRANSPOSE,
+ // aggregate union rule
+ CoreRules.AGGREGATE_UNION_AGGREGATE,
+
+ // reduce aggregate functions like AVG, STDDEV_POP etc.
+ CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
+
+ // remove unnecessary sort rule
+ CoreRules.SORT_REMOVE,
+
+ // prune empty results rules
+ PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
+ PruneEmptyRules.JOIN_RIGHT_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE, PruneEmptyRules.SORT_INSTANCE,
+ PruneEmptyRules.UNION_INSTANCE,
+
+ // Pinot specific rules
+ PinotExchangeNodeInsertRule.INSTANCE);
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
new file mode 100644
index 0000000..55797b0
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.type;
+
+import java.util.Map;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Extends Java-base TypeFactory from Calcite.
+ *
+ * <p>{@link JavaTypeFactoryImpl} is used here because we are not overriding
much of the TypeFactory methods
+ * required by Calcite. We will start extending {@link SqlTypeFactoryImpl} or
even {@link RelDataTypeFactory}
+ * when necessary for Pinot to override such mechanism.
+ *
+ * <p>Noted that {@link JavaTypeFactoryImpl} is subject to change. Please pay
extra attention to this class when
+ * upgrading Calcite versions.
+ */
+public class TypeFactory extends JavaTypeFactoryImpl {
+ private final RelDataTypeSystem _typeSystem;
+
+ public TypeFactory(RelDataTypeSystem typeSystem) {
+ _typeSystem = typeSystem;
+ }
+
+ public RelDataType createRelDataTypeFromSchema(Schema schema) {
+ Builder builder = new Builder(this);
+ for (Map.Entry<String, FieldSpec> e : schema.getFieldSpecMap().entrySet())
{
+ builder.add(e.getKey(), toRelDataType(e.getValue()));
+ }
+ return builder.build();
+ }
+
+ private RelDataType toRelDataType(FieldSpec fieldSpec) {
+ switch (fieldSpec.getDataType()) {
+ case INT:
+ return createSqlType(SqlTypeName.INTEGER);
+ case LONG:
+ return createSqlType(SqlTypeName.BIGINT);
+ case FLOAT:
+ return createSqlType(SqlTypeName.FLOAT);
+ case DOUBLE:
+ return createSqlType(SqlTypeName.DOUBLE);
+ case BOOLEAN:
+ return createSqlType(SqlTypeName.BOOLEAN);
+ case TIMESTAMP:
+ return createSqlType(SqlTypeName.TIMESTAMP);
+ case STRING:
+ return createSqlType(SqlTypeName.VARCHAR);
+ case BYTES:
+ return createSqlType(SqlTypeName.VARBINARY);
+ case JSON:
+ // TODO: support JSON, JSON should be supported using a special
RelDataType as it is not a simple String,
+ // nor can it be easily parsed as a STRUCT.
+ case LIST:
+ // TODO: support LIST, MV column should go fall into this category.
+ case STRUCT:
+ case MAP:
+ default:
+ throw new UnsupportedOperationException("unsupported!");
+ }
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
new file mode 100644
index 0000000..b2606d9
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.type;
+
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+
+/**
+ * The {@code TypeSystem} overwrites Calcite type system with Pinot specific
logics.
+ *
+ * TODO: no overwrite for now.
+ */
+public class TypeSystem extends RelDataTypeSystemImpl {
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
new file mode 100644
index 0000000..f774976
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.validate;
+
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+
+
+/**
+ * The {@code Validator} overwrites Calcite's Validator with Pinot specific
logics.
+ */
+public class Validator extends SqlValidatorImpl {
+
+ public Validator(SqlOperatorTable opTab, SqlValidatorCatalogReader
catalogReader, RelDataTypeFactory typeFactory) {
+ // TODO: support BABEL validator. Currently parser conformance is set to
use BABEL.
+ super(opTab, catalogReader, typeFactory, Config.DEFAULT);
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
new file mode 100644
index 0000000..9114696
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import com.google.common.collect.ImmutableList;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.externalize.RelXmlWriter;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.catalog.PinotCatalog;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class QueryEnvironmentTest {
+ private QueryEnvironment _queryEnvironment;
+
+ @BeforeClass
+ public void setUp() {
+ // the port doesn't matter as we are not actually making a server call.
+ RoutingManager routingManager =
QueryEnvironmentTestUtils.getMockRoutingManager(1, 2);
+ _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
+ CalciteSchemaBuilder.asRootSchema(new
PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())),
+ new WorkerManager("localhost", 3, routingManager));
+ }
+
+ @Test
+ public void testSqlStrings()
+ throws Exception {
+ testQueryParsing("SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3
>= 0",
+ "SELECT *\n" + "FROM `a`\n" + "INNER JOIN `b` ON `a`.`col1` =
`b`.`col2`\n" + "WHERE `a`.`col3` >= 0");
+ }
+
+ @Test
+ public void testQueryToStages()
+ throws Exception {
+ PlannerContext plannerContext = new PlannerContext();
+ String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2";
+ QueryPlan queryPlan = _queryEnvironment.planQuery(query);
+ Assert.assertEquals(queryPlan.getQueryStageMap().size(), 4);
+ Assert.assertEquals(queryPlan.getStageMetadataMap().size(), 4);
+ for (Map.Entry<String, StageMetadata> e :
queryPlan.getStageMetadataMap().entrySet()) {
+ List<String> tables = e.getValue().getScannedTables();
+ if (tables.size() == 1) {
+ // table scan stages; for tableA it should have 2 hosts, for tableB it
should have only 1
+ Assert.assertEquals(
+
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
+ tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_1",
"Server_localhost_2")
+ : ImmutableList.of("Server_localhost_1"));
+ } else if (!e.getKey().equals("ROOT")) {
+ // join stage should have both servers used.
+ Assert.assertEquals(
+
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
+ ImmutableList.of("Server_localhost_1", "Server_localhost_2"));
+ } else {
+ // reduce stage should have the reducer instance.
+ Assert.assertEquals(
+
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
+ ImmutableList.of("Server_localhost_3"));
+ }
+ }
+ }
+
+ @Test
+ public void testQueryToRel()
+ throws Exception {
+ PlannerContext plannerContext = new PlannerContext();
+ String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >=
0";
+ SqlNode parsed = _queryEnvironment.parse(query, plannerContext);
+ SqlNode validated = _queryEnvironment.validate(parsed);
+ RelRoot relRoot = _queryEnvironment.toRelation(validated, plannerContext);
+ RelNode optimized = _queryEnvironment.optimize(relRoot, plannerContext);
+
+ // Assert that relational plan can be written into a ALL-ATTRIBUTE digest.
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ RelWriter planWriter = new RelXmlWriter(pw,
SqlExplainLevel.ALL_ATTRIBUTES);
+ optimized.explain(planWriter);
+ Assert.assertNotNull(sw.toString());
+ }
+
+ private void testQueryParsing(String query, String digest)
+ throws Exception {
+ PlannerContext plannerContext = new PlannerContext();
+ SqlNode sqlNode = _queryEnvironment.parse(query, plannerContext);
+ _queryEnvironment.validate(sqlNode);
+ Assert.assertEquals(sqlNode.toString(), digest);
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
new file mode 100644
index 0000000..cc0db38
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.catalog.PinotCatalog;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.routing.WorkerInstance;
+import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Base query environment test that provides a bunch of mock tables / schemas
so that
+ * we can run a simple query planning, produce stages / metadata for other
components to test.
+ */
+public class QueryEnvironmentTestUtils {
+ public static final Schema.SchemaBuilder SCHEMA_BUILDER;
+ public static final Map<String, List<String>> SERVER1_SEGMENTS =
+ ImmutableMap.of("a", Lists.newArrayList("a1", "a2"), "b",
Lists.newArrayList("b1"), "c",
+ Lists.newArrayList("c1"));
+ public static final Map<String, List<String>> SERVER2_SEGMENTS =
+ ImmutableMap.of("a", Lists.newArrayList("a3"), "c",
Lists.newArrayList("c2", "c3"));
+
+ static {
+ SCHEMA_BUILDER = new
Schema.SchemaBuilder().addSingleValueDimension("col1",
FieldSpec.DataType.STRING, "")
+ .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
+ .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:HOURS")
+ .addMetric("col3", FieldSpec.DataType.INT, 0);
+ }
+
+ private QueryEnvironmentTestUtils() {
+ // do not instantiate.
+ }
+
+ public static TableCache mockTableCache() {
+ TableCache mock = mock(TableCache.class);
+ when(mock.getTableNameMap()).thenReturn(ImmutableMap.of("a", "a", "b",
"b", "c", "c"));
+
when(mock.getSchema("a")).thenReturn(SCHEMA_BUILDER.setSchemaName("a").build());
+
when(mock.getSchema("b")).thenReturn(SCHEMA_BUILDER.setSchemaName("b").build());
+
when(mock.getSchema("c")).thenReturn(SCHEMA_BUILDER.setSchemaName("c").build());
+ return mock;
+ }
+
+ public static QueryEnvironment getQueryEnvironment(int reducerPort, int
port1, int port2) {
+ RoutingManager routingManager =
QueryEnvironmentTestUtils.getMockRoutingManager(port1, port2);
+ return new QueryEnvironment(new TypeFactory(new TypeSystem()),
+ CalciteSchemaBuilder.asRootSchema(new
PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())),
+ new WorkerManager("localhost", reducerPort, routingManager));
+ }
+
+ public static RoutingManager getMockRoutingManager(int port1, int port2) {
+ String server1 = String.format("localhost_%d", port1);
+ String server2 = String.format("localhost_%d", port2);
+ // this doesn't test the QueryServer functionality so the server port can
be the same as the mailbox port.
+ // this is only use for test identifier purpose.
+ ServerInstance host1 = new WorkerInstance("localhost", port1, port1);
+ ServerInstance host2 = new WorkerInstance("localhost", port2, port2);
+
+ RoutingTable rtA = mock(RoutingTable.class);
+ when(rtA.getServerInstanceToSegmentsMap()).thenReturn(
+ ImmutableMap.of(host1, SERVER1_SEGMENTS.get("a"), host2,
SERVER2_SEGMENTS.get("a")));
+ RoutingTable rtB = mock(RoutingTable.class);
+
when(rtB.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host1,
SERVER1_SEGMENTS.get("b")));
+ RoutingTable rtC = mock(RoutingTable.class);
+ when(rtC.getServerInstanceToSegmentsMap()).thenReturn(
+ ImmutableMap.of(host1, SERVER1_SEGMENTS.get("c"), host2,
SERVER2_SEGMENTS.get("c")));
+ Map<String, RoutingTable> mockRoutingTableMap = ImmutableMap.of("a", rtA,
"b", rtB, "c", rtC);
+ RoutingManager mock = mock(RoutingManager.class);
+ when(mock.getRoutingTable(any())).thenAnswer(invocation -> {
+ BrokerRequest brokerRequest = invocation.getArgument(0);
+ String tableName =
brokerRequest.getPinotQuery().getDataSource().getTableName();
+ return
mockRoutingTableMap.get(TableNameBuilder.extractRawTableName(tableName));
+ });
+
when(mock.getEnabledServerInstanceMap()).thenReturn(ImmutableMap.of(server1,
host1, server2, host2));
+ return mock;
+ }
+
+ public static String getTestStageByServerCount(QueryPlan queryPlan, int
serverCount) {
+ List<String> stageIds = queryPlan.getStageMetadataMap().entrySet().stream()
+ .filter(e -> !e.getKey().equals("ROOT") &&
e.getValue().getServerInstances().size() == serverCount)
+ .map(Map.Entry::getKey).collect(Collectors.toList());
+ return stageIds.size() > 0 ? stageIds.get(0) : null;
+ }
+
+ public static int getAvailablePort() {
+ try {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find an available port to use", e);
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index eb8cdc2..5ad6908 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,8 @@
<module>pinot-connectors</module>
<module>pinot-segment-local</module>
<module>pinot-compatibility-verifier</module>
+
+ <module>pinot-query-planner</module>
</modules>
<licenses>
@@ -1047,10 +1049,6 @@
<artifactId>commons-dbcp2</artifactId>
</exclusion>
<exclusion>
- <groupId>com.esri.geometry</groupId>
- <artifactId>esri-geometry-api</artifactId>
- </exclusion>
- <exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]