This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 454a1d8740 [Feature][multistage] Thread-safe query planning (#9344)
454a1d8740 is described below
commit 454a1d87400b3033b72b5a6beb06432134ff3c5e
Author: Yao Liu <[email protected]>
AuthorDate: Fri Sep 9 21:04:55 2022 -0700
[Feature][multistage] Thread-safe query planning (#9344)
* multi-thread query planning
* Use auto-close planner context and fix test
* address styple comments
* address javadoc
* multi-thread query planning
* Use auto-close planner context and fix test
* address styple comments
* address javadoc
---
.../org/apache/pinot/query/QueryEnvironment.java | 62 +++++++---------------
.../apache/pinot/query/context/PlannerContext.java | 48 +++++++++++++++--
.../apache/pinot/query/QueryCompilationTest.java | 54 +++++++++++++++++++
3 files changed, 118 insertions(+), 46 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index da13d03a35..c1797381ba 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -24,14 +24,12 @@ import java.util.Properties;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.prepare.PlannerImpl;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
@@ -44,8 +42,6 @@ import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.sql2rel.StandardConvertletTable;
import org.apache.calcite.tools.FrameworkConfig;
@@ -53,11 +49,9 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.logical.LogicalPlanner;
import org.apache.pinot.query.planner.logical.StagePlanner;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.type.TypeFactory;
-import org.apache.pinot.query.validate.Validator;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
@@ -73,11 +67,10 @@ public class QueryEnvironment {
// Calcite extension/plugins
private final CalciteSchema _rootSchema;
- private final PlannerImpl _planner;
private final Prepare.CatalogReader _catalogReader;
private final RelDataTypeFactory _typeFactory;
- private final RelOptPlanner _relOptPlanner;
- private final SqlValidator _validator;
+
+ private final HepProgram _hepProgram;
// Pinot extensions
private final Collection<RelOptRule> _logicalRuleSet;
@@ -89,15 +82,11 @@ public class QueryEnvironment {
_workerManager = workerManager;
_config = Frameworks.newConfigBuilder().traitDefs().build();
- // Planner is not thread-safe. must be reset() after each use.
- _planner = new PlannerImpl(_config);
-
// catalog
Properties catalogReaderConfigProperties = new Properties();
catalogReaderConfigProperties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
"true");
_catalogReader = new CalciteCatalogReader(_rootSchema,
_rootSchema.path(null), _typeFactory,
new CalciteConnectionConfigImpl(catalogReaderConfigProperties));
- _validator = new Validator(SqlStdOperatorTable.instance(), _catalogReader,
_typeFactory);
// optimizer rules
_logicalRuleSet = PinotQueryRuleSets.LOGICAL_OPT_RULES;
@@ -107,19 +96,13 @@ public class QueryEnvironment {
for (RelOptRule relOptRule : _logicalRuleSet) {
hepProgramBuilder.addRuleInstance(relOptRule);
}
- _relOptPlanner = new LogicalPlanner(hepProgramBuilder.build(),
Contexts.EMPTY_CONTEXT);
+ _hepProgram = hepProgramBuilder.build();
}
/**
* Plan a SQL query.
*
- * <p>Noted that since Calcite's {@link org.apache.calcite.tools.Planner} is
not threadsafe.
- * Only one query query can be planned at a time. Afterwards planner needs
to be reset in order to clear the
- * state for the next planning.
- *
- * <p>In order for faster planning, we pre-constructed all the planner
objects and use the plan-then-reset
- * model. Thusn when using {@code QueryEnvironment#planQuery(String)},
caller should ensure that no concurrent
- * plan execution occurs.
+ * This function is thread safe since we construct a new PlannerContext
every time.
*
* TODO: follow benchmark and profile to measure whether it make sense for
the latency-concurrency trade-off
* between reusing plannerImpl vs. create a new planner for each query.
@@ -129,8 +112,7 @@ public class QueryEnvironment {
* @return a dispatchable query plan
*/
public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions
sqlNodeAndOptions) {
- try {
- PlannerContext plannerContext = new PlannerContext();
+ try (PlannerContext plannerContext = new PlannerContext(_config,
_catalogReader, _typeFactory, _hepProgram)) {
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelNode relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(),
plannerContext);
return toDispatchablePlan(relRoot, plannerContext);
@@ -151,14 +133,13 @@ public class QueryEnvironment {
* @return the explained query plan.
*/
public String explainQuery(String sqlQuery, SqlNodeAndOptions
sqlNodeAndOptions) {
- try {
+ try (PlannerContext plannerContext = new PlannerContext(_config,
_catalogReader, _typeFactory, _hepProgram)) {
SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
- PlannerContext plannerContext = new PlannerContext();
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelNode relRoot = compileQuery(explain.getExplicandum(), plannerContext);
SqlExplainFormat format = explain.getFormat() == null ?
SqlExplainFormat.DOT : explain.getFormat();
- SqlExplainLevel level = explain.getDetailLevel() == null ?
SqlExplainLevel.DIGEST_ATTRIBUTES
- : explain.getDetailLevel();
+ SqlExplainLevel level =
+ explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES
: explain.getDetailLevel();
return PlannerUtils.explainPlan(relRoot, format, level);
} catch (Exception e) {
throw new RuntimeException("Error explain query plan for: " + sqlQuery,
e);
@@ -182,20 +163,15 @@ public class QueryEnvironment {
@VisibleForTesting
protected RelNode compileQuery(SqlNode sqlNode, PlannerContext
plannerContext)
throws Exception {
- try {
- SqlNode validated = validate(sqlNode);
- RelRoot relation = toRelation(validated, plannerContext);
- return optimize(relation, plannerContext);
- } finally {
- _planner.close();
- _planner.reset();
- }
+ SqlNode validated = validate(sqlNode, plannerContext);
+ RelRoot relation = toRelation(validated, plannerContext);
+ return optimize(relation, plannerContext);
}
- private SqlNode validate(SqlNode parsed)
+ private SqlNode validate(SqlNode parsed, PlannerContext plannerContext)
throws Exception {
// 2. validator to validate.
- SqlNode validated = _validator.validate(parsed);
+ SqlNode validated = plannerContext.getValidator().validate(parsed);
if (null == validated || !validated.getKind().belongsTo(SqlKind.QUERY)) {
throw new IllegalArgumentException(
String.format("unsupported SQL query, cannot validate out a valid
sql from:\n%s", parsed));
@@ -206,9 +182,10 @@ public class QueryEnvironment {
private RelRoot toRelation(SqlNode parsed, PlannerContext plannerContext) {
// 3. convert sqlNode to relNode.
RexBuilder rexBuilder = new RexBuilder(_typeFactory);
- RelOptCluster cluster = RelOptCluster.create(_relOptPlanner, rexBuilder);
+ RelOptCluster cluster =
RelOptCluster.create(plannerContext.getRelOptPlanner(), rexBuilder);
SqlToRelConverter sqlToRelConverter =
- new SqlToRelConverter(_planner, _validator, _catalogReader, cluster,
StandardConvertletTable.INSTANCE,
+ new SqlToRelConverter(plannerContext.getPlanner(),
plannerContext.getValidator(), _catalogReader, cluster,
+ StandardConvertletTable.INSTANCE,
SqlToRelConverter.config().withHintStrategyTable(getHintStrategyTable(plannerContext)));
return sqlToRelConverter.convertQuery(parsed, false, true);
}
@@ -217,8 +194,8 @@ public class QueryEnvironment {
// 4. optimize relNode
// TODO: add support for traits, cost factory.
try {
- _relOptPlanner.setRoot(relRoot.rel);
- return _relOptPlanner.findBestExp();
+ plannerContext.getRelOptPlanner().setRoot(relRoot.rel);
+ return plannerContext.getRelOptPlanner().findBestExp();
} catch (Exception e) {
throw new UnsupportedOperationException(
"Cannot generate a valid execution plan for the given query: " +
RelOptUtil.toString(relRoot.rel), e);
@@ -231,7 +208,6 @@ public class QueryEnvironment {
return queryStagePlanner.makePlan(relRoot);
}
-
// --------------------------------------------------------------------------
// utils
// --------------------------------------------------------------------------
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
index 1997d1f370..859b5e3d60 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
@@ -19,17 +19,53 @@
package org.apache.pinot.query.context;
import java.util.Map;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.hep.HepProgram;
+import org.apache.calcite.prepare.PlannerImpl;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.pinot.query.planner.logical.LogicalPlanner;
+import org.apache.pinot.query.validate.Validator;
/**
* PlannerContext is an object that holds all contextual information during
planning phase.
*
- * TODO: currently the planner context is not used since we don't support
option or query rewrite. This construct is
- * here as a placeholder for the parsed out options.
+ * TODO: currently we don't support option or query rewrite.
+ * It is used to hold per query context for query planning, which cannot be
shared across queries.
*/
-public class PlannerContext {
+public class PlannerContext implements AutoCloseable {
+ private final PlannerImpl _planner;
+
+ private final SqlValidator _validator;
+
+ private final RelOptPlanner _relOptPlanner;
+
private Map<String, String> _options;
+ public PlannerContext(FrameworkConfig config, Prepare.CatalogReader
catalogReader, RelDataTypeFactory typeFactory,
+ HepProgram hepProgram) {
+ _planner = new PlannerImpl(config);
+ _validator = new Validator(SqlStdOperatorTable.instance(), catalogReader,
typeFactory);
+ _relOptPlanner = new LogicalPlanner(hepProgram, Contexts.EMPTY_CONTEXT);
+ }
+
+ public PlannerImpl getPlanner() {
+ return _planner;
+ }
+
+ public SqlValidator getValidator() {
+ return _validator;
+ }
+
+ public RelOptPlanner getRelOptPlanner() {
+ return _relOptPlanner;
+ }
+
public void setOptions(Map<String, String> options) {
_options = options;
}
@@ -37,4 +73,10 @@ public class PlannerContext {
public Map<String, String> getOptions() {
return _options;
}
+
+ @Override
+ public void close()
+ throws Exception {
+ _planner.close();
+ }
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index cb666bfeba..dcd7c5c11a 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -19,8 +19,12 @@
package org.apache.pinot.query;
import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.core.transport.ServerInstance;
@@ -176,6 +180,56 @@ public class QueryCompilationTest extends
QueryEnvironmentTestBase {
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
"Server_localhost_1");
}
+ // Test that plan query can be run as multi-thread.
+ @Test
+ public void testPlanQueryMultiThread()
+ throws Exception {
+ Map<String, ArrayList<QueryPlan>> queryPlans = new HashMap<>();
+ Lock lock = new ReentrantLock();
+ Runnable joinQuery = () -> {
+ String query = "SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON
a.col1 = b.col2";
+ QueryPlan queryPlan = _queryEnvironment.planQuery(query);
+ lock.lock();
+ if (!queryPlans.containsKey(queryPlan)) {
+ queryPlans.put(query, new ArrayList<>());
+ }
+ queryPlans.get(query).add(queryPlan);
+ lock.unlock();
+ };
+ Runnable selectQuery = () -> {
+ String query = "SELECT * FROM a";
+ QueryPlan queryPlan = _queryEnvironment.planQuery(query);
+ lock.lock();
+ if (!queryPlans.containsKey(queryPlan)) {
+ queryPlans.put(query, new ArrayList<>());
+ }
+ queryPlans.get(query).add(queryPlan);
+ lock.unlock();
+ };
+ ArrayList<Thread> threads = new ArrayList<>();
+ final int numThreads = 10;
+ for (int i = 0; i < numThreads; i++) {
+ Thread thread = null;
+ if (i % 2 == 0) {
+ thread = new Thread(joinQuery);
+ } else {
+ thread = new Thread(selectQuery);
+ }
+ threads.add(thread);
+ }
+ for (Thread t : threads) {
+ t.start();
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ for (ArrayList<QueryPlan> plans : queryPlans.values()) {
+ for (QueryPlan plan : plans) {
+ Assert.assertTrue(plan.equals(plans.get(0)));
+ }
+ }
+ }
+
// --------------------------------------------------------------------------
// Test Utils.
// --------------------------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]