This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0736cede79 Adding a plan listener to report each rule evaluation time
in the optimizer (#15192)
0736cede79 is described below
commit 0736cede79717fb08c6cddbfbd4de0e522c80ed7
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Mar 6 03:35:39 2025 -0800
Adding a plan listener to report each rule evaluation time in the optimizer
(#15192)
---
.../MultiStageBrokerRequestHandler.java | 30 +++-
.../tests/OfflineClusterIntegrationTest.java | 87 ++++++-----
.../org/apache/pinot/query/QueryEnvironment.java | 60 ++++++--
.../apache/pinot/query/context/PlannerContext.java | 17 ++-
.../query/context/RuleTimingPlannerListener.java | 160 +++++++++++++++++++++
.../query/planner/explain/PinotRelJsonWriter.java | 28 ++++
6 files changed, 326 insertions(+), 56 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 80dba42f8e..1a04cec3c4 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -204,6 +205,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
String plan = queryPlanResult.getExplainPlan();
Set<String> tableNames = queryPlanResult.getTableNames();
+ Map<String, String> extraFields = queryPlanResult.getExtraFields();
TableAuthorizationResult tableAuthorizationResult =
hasTableAccess(requesterIdentity, tableNames, requestContext,
httpHeaders);
if (!tableAuthorizationResult.hasAccess()) {
@@ -213,7 +215,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
throw new WebApplicationException("Permission denied. " +
failureMessage, Response.Status.FORBIDDEN);
}
- return constructMultistageExplainPlan(query, plan);
+ return constructMultistageExplainPlan(query, plan, extraFields);
case SELECT:
default:
try {
@@ -263,7 +265,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
Set<QueryServerInstance> servers = new HashSet<>();
- for (DispatchablePlanFragment planFragment:
dispatchableSubPlan.getQueryStageMap().values()) {
+ for (DispatchablePlanFragment planFragment :
dispatchableSubPlan.getQueryStageMap().values()) {
servers.addAll(planFragment.getServerInstances());
}
@@ -521,12 +523,26 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
}
- private BrokerResponse constructMultistageExplainPlan(String sql, String
plan) {
+ private BrokerResponse constructMultistageExplainPlan(String sql, String
plan, Map<String, String> extraFields) {
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
- List<Object[]> rows = new ArrayList<>();
- rows.add(new Object[]{sql, plan});
- DataSchema multistageExplainResultSchema = new DataSchema(new
String[]{"SQL", "PLAN"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.STRING});
+ int totalFieldCount = extraFields.size() + 2;
+ String[] fieldNames = new String[totalFieldCount];
+ Object[] fieldValues = new Object[totalFieldCount];
+ fieldNames[0] = "SQL";
+ fieldValues[0] = sql;
+ fieldNames[1] = "PLAN";
+ fieldValues[1] = plan;
+ int i = 2;
+ for (Map.Entry<String, String> entry : extraFields.entrySet()) {
+ fieldNames[i] = entry.getKey().toUpperCase();
+ fieldValues[i] = entry.getValue();
+ i++;
+ }
+ DataSchema.ColumnDataType[] columnDataTypes = new
DataSchema.ColumnDataType[totalFieldCount];
+ Arrays.fill(columnDataTypes, DataSchema.ColumnDataType.STRING);
+ DataSchema multistageExplainResultSchema = new DataSchema(fieldNames,
columnDataTypes);
+ List<Object[]> rows = new ArrayList<>(1);
+ rows.add(fieldValues);
brokerResponse.setResultTable(new
ResultTable(multistageExplainResultSchema, rows));
return brokerResponse;
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 8c3552ffe8..7e2e0f9ea1 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3453,44 +3453,63 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Replace string "docs:[0-9]+" with "docs:*" so that test doesn't fail
when number of documents change. This is
// needed because both OfflineClusterIntegrationTest and
MultiNodesOfflineClusterIntegrationTest run this test
// case with different number of documents in the segment.
- response1 = response1.replaceAll("docs:[0-9]+", "docs:*");
-
- //@formatter:off
- assertEquals(response1, "{"
- +
"\"dataSchema\":{\"columnNames\":[\"SQL\",\"PLAN\"],\"columnDataTypes\":[\"STRING\",\"STRING\"]},"
- + "\"rows\":[["
- + "\"EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR SELECT count(*) AS count,
Carrier AS name FROM mytable "
- + "GROUP BY name ORDER BY 1\","
- + "\"Execution Plan\\n"
- + "LogicalSort(sort0=[$0], dir0=[ASC])\\n"
- + " PinotLogicalSortExchange("
- + "distribution=[hash], collation=[[0]], isSortOnSender=[false],
isSortOnReceiver=[true])\\n"
- + " LogicalProject(count=[$1], name=[$0])\\n"
- + " PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)],
aggType=[FINAL])\\n"
- + " PinotLogicalExchange(distribution=[hash[0]])\\n"
- + " PinotLogicalAggregate(group=[{17}], agg#0=[COUNT()],
aggType=[LEAF])\\n"
- + " LogicalTableScan(table=[[default, mytable]])\\n"
- + "\"]]}");
- //@formatter:on
+ response1 = response1.replaceAll("docs:[0-9]+", "docs:*")
+ .replaceAll("Time: \\d+\\.\\d+", "Time:*");
+
+ JsonNode response1Json = JsonUtils.stringToJsonNode(response1);
+
assertEquals(response1Json.get("dataSchema").get("columnNames").get(0).asText(),
"SQL");
+
assertEquals(response1Json.get("dataSchema").get("columnNames").get(1).asText(),
"PLAN");
+
assertEquals(response1Json.get("dataSchema").get("columnNames").get(2).asText(),
"RULE_TIMINGS");
+
assertEquals(response1Json.get("dataSchema").get("columnDataTypes").get(0).asText(),
"STRING");
+
assertEquals(response1Json.get("dataSchema").get("columnDataTypes").get(1).asText(),
"STRING");
+
assertEquals(response1Json.get("dataSchema").get("columnDataTypes").get(2).asText(),
"STRING");
+
+ assertEquals(response1Json.get("rows").get(0).get(0).asText(),
+ "EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR SELECT count(*) AS count,
Carrier AS name FROM mytable GROUP BY name"
+ + " ORDER BY 1");
+ assertEquals(response1Json.get("rows").get(0).get(1).asText(), "Execution
Plan\n"
+ + "LogicalSort(sort0=[$0], dir0=[ASC])\n"
+ + " PinotLogicalSortExchange(distribution=[hash], collation=[[0]],
isSortOnSender=[false], "
+ + "isSortOnReceiver=[true])\n"
+ + " LogicalProject(count=[$1], name=[$0])\n"
+ + " PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)],
aggType=[FINAL])\n"
+ + " PinotLogicalExchange(distribution=[hash[0]])\n"
+ + " PinotLogicalAggregate(group=[{17}], agg#0=[COUNT()],
aggType=[LEAF])\n"
+ + " LogicalTableScan(table=[[default, mytable]])\n");
+ assertEquals(response1Json.get("rows").get(0).get(2).asText(), "Rule
Execution Times\n"
+ + "Rule: AggregateProjectMergeRule -> Time:*\n"
+ + "Rule: Project -> Time:*\n"
+ + "Rule: AggregateRemoveRule -> Time:*\n"
+ + "Rule: SortRemoveRule -> Time:*\n");
// In the query below, FlightNum column has an inverted index and there is
no data satisfying the predicate
// "FlightNum < 0". Hence, all segments are pruned out before query
execution on the server side.
// language=sql
String query2 = "EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR SELECT * FROM
mytable WHERE FlightNum < 0";
- String response2 = postQuery(query2).get("resultTable").toString();
-
- //@formatter:off
- Pattern pattern = Pattern.compile("\\{"
- +
"\"dataSchema\":\\{\"columnNames\":\\[\"SQL\",\"PLAN\"],\"columnDataTypes\":\\[\"STRING\",\"STRING\"]},"
- + "\"rows\":\\[\\[\"EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR SELECT \\*
FROM mytable WHERE FlightNum < 0\","
- + "\"Execution Plan.."
- + "LogicalProject\\(.*\\).."
- + " LogicalFilter\\(condition=\\[<\\(.*, 0\\)]\\).."
- + " LogicalTableScan\\(table=\\[\\[default, mytable]]\\)..\""
- + "]]}");
- //@formatter:on
- boolean found = pattern.matcher(response2).find();
- assertTrue(found, "Pattern " + pattern + " not found in " + response2);
+ String response2 = postQuery(query2).get("resultTable").toString()
+ .replaceAll("Time: \\d+\\.\\d+", "Time: *");
+
+ JsonNode response2Json = JsonUtils.stringToJsonNode(response2);
+
assertEquals(response2Json.get("dataSchema").get("columnNames").get(0).asText(),
"SQL");
+
assertEquals(response2Json.get("dataSchema").get("columnNames").get(1).asText(),
"PLAN");
+
assertEquals(response2Json.get("dataSchema").get("columnNames").get(2).asText(),
"RULE_TIMINGS");
+
assertEquals(response2Json.get("dataSchema").get("columnDataTypes").get(0).asText(),
"STRING");
+
assertEquals(response2Json.get("dataSchema").get("columnDataTypes").get(1).asText(),
"STRING");
+
assertEquals(response2Json.get("dataSchema").get("columnDataTypes").get(2).asText(),
"STRING");
+ assertEquals(response2Json.get("rows").get(0).get(0).asText(),
+ "EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR SELECT * FROM mytable WHERE
FlightNum < 0");
+ assertTrue(Pattern.compile(
+ "Execution Plan\n"
+ + "LogicalProject\\(.*\\)\n"
+ + " LogicalFilter\\(condition=\\[<\\(.*, 0\\)]\\)\n"
+ + " LogicalTableScan\\(table=\\[\\[default, mytable]]\\)\n"
+ ).matcher(response2Json.get("rows").get(0).get(1).asText()).find());
+ assertEquals(response2Json.get("rows").get(0).get(2).asText(),
+ "Rule Execution Times\n"
+ + "Rule: Project -> Time: *\n"
+ + "Rule: FilterProjectTransposeRule -> Time: *\n"
+ + "Rule: Filter -> Time: *\n"
+ + "Rule: ProjectFilterTransposeRule -> Time: *\n");
}
/** Test to make sure we are properly handling string comparisons in
predicates. */
@@ -4016,7 +4035,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
@Test(dataProvider = "useBothQueryEngines")
public void testResponseWithClientRequestId(boolean useMultiStageQueryEngine)
- throws Exception {
+ throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String clientRequestId = UUID.randomUUID().toString();
String sqlQuery =
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 472527ca02..d3d7f3614e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -62,6 +63,7 @@ import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.context.RuleTimingPlannerListener;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.explain.AskingServerStageExplainer;
@@ -93,9 +95,9 @@ import org.slf4j.LoggerFactory;
* tables involved in the query.
*/
- //TODO: We should consider splitting this class in two: One that is used for
parsing and one that is used for
- // executing queries. This would allow us to remove the worker manager from
the parsing environment and therefore
- // make sure there is a worker manager when executing queries.
+//TODO: We should consider splitting this class in two: One that is used for
parsing and one that is used for
+// executing queries. This would allow us to remove the worker manager from
the parsing environment and therefore
+// make sure there is a worker manager when executing queries.
@Value.Enclosing
public class QueryEnvironment {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryEnvironment.class);
@@ -138,8 +140,15 @@ public class QueryEnvironment {
private PlannerContext getPlannerContext(SqlNodeAndOptions
sqlNodeAndOptions) {
WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
HepProgram traitProgram = getTraitProgram(workerManager);
+ SqlExplainFormat format = SqlExplainFormat.DOT;
+ if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) {
+ SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
+ if (explain.getFormat() != null) {
+ format = explain.getFormat();
+ }
+ }
return new PlannerContext(_config, _catalogReader, _typeFactory,
_optProgram, traitProgram,
- sqlNodeAndOptions.getOptions(), _envConfig);
+ sqlNodeAndOptions.getOptions(), _envConfig, format);
}
public Set<String> getResolvedTables() {
@@ -189,7 +198,7 @@ public class QueryEnvironment {
// Each SubPlan should be able to run independently from Broker then set
the results into the dependent
// SubPlan for further processing.
DispatchableSubPlan dispatchableSubPlan = toDispatchableSubPlan(relRoot,
plannerContext, requestId);
- return new QueryPlannerResult(dispatchableSubPlan, null,
dispatchableSubPlan.getTableNames());
+ return getQueryPlannerResult(plannerContext, dispatchableSubPlan, null,
dispatchableSubPlan.getTableNames());
} catch (CalciteContextException e) {
throw new RuntimeException("Error composing query plan for '" + sqlQuery
+ "': " + e.getMessage() + "'", e);
} catch (Throwable t) {
@@ -219,19 +228,20 @@ public class QueryEnvironment {
try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions))
{
SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext);
+ SqlExplainFormat format = plannerContext.getSqlExplainFormat();
if (explain instanceof SqlPhysicalExplain) {
// get the physical plan for query.
DispatchableSubPlan dispatchableSubPlan =
toDispatchableSubPlan(relRoot, plannerContext, requestId);
- return new QueryPlannerResult(null,
PhysicalExplainPlanVisitor.explain(dispatchableSubPlan),
- dispatchableSubPlan.getTableNames());
+ return getQueryPlannerResult(plannerContext, dispatchableSubPlan,
+ PhysicalExplainPlanVisitor.explain(dispatchableSubPlan),
dispatchableSubPlan.getTableNames());
} else {
// get the logical plan for query.
- SqlExplainFormat format = explain.getFormat() == null ?
SqlExplainFormat.DOT : explain.getFormat();
SqlExplainLevel level =
explain.getDetailLevel() == null ?
SqlExplainLevel.DIGEST_ATTRIBUTES : explain.getDetailLevel();
Set<String> tableNames =
RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel);
if (!explain.withImplementation() || onServerExplainer == null) {
- return new QueryPlannerResult(null,
PlannerUtils.explainPlan(relRoot.rel, format, level), tableNames);
+ return getQueryPlannerResult(plannerContext, null,
PlannerUtils.explainPlan(relRoot.rel, format, level),
+ tableNames);
} else {
Map<String, String> options = sqlNodeAndOptions.getOptions();
boolean explainPlanVerbose =
QueryOptionsUtils.isExplainPlanVerbose(options);
@@ -249,9 +259,8 @@ public class QueryEnvironment {
RelNode explainedNode =
MultiStageExplainAskingServersUtils.modifyRel(relRoot.rel,
dispatchableSubPlan.getQueryStages(), nodeTracker,
serversExplainer);
- String explainStr = PlannerUtils.explainPlan(explainedNode, format,
level);
-
- return new QueryPlannerResult(null, explainStr,
dispatchableSubPlan.getTableNames());
+ return getQueryPlannerResult(plannerContext, dispatchableSubPlan,
+ PlannerUtils.explainPlan(explainedNode, format, level),
dispatchableSubPlan.getTableNames());
}
}
} catch (Exception e) {
@@ -259,6 +268,16 @@ public class QueryEnvironment {
}
}
+ private QueryEnvironment.QueryPlannerResult
getQueryPlannerResult(PlannerContext plannerContext,
+ DispatchableSubPlan dispatchableSubPlan, String explainStr, Set<String>
tableNames) {
+ Map<String, String> extraFields = new HashMap<>();
+ if
(plannerContext.getPlannerOutput().containsKey(RuleTimingPlannerListener.RULE_TIMINGS))
{
+ extraFields.put(RuleTimingPlannerListener.RULE_TIMINGS,
+
plannerContext.getPlannerOutput().get(RuleTimingPlannerListener.RULE_TIMINGS));
+ }
+ return new QueryPlannerResult(dispatchableSubPlan, explainStr, tableNames,
extraFields);
+ }
+
@VisibleForTesting
public String explainQuery(String sqlQuery, long requestId) {
SqlNodeAndOptions sqlNodeAndOptions =
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery);
@@ -316,12 +335,14 @@ public class QueryEnvironment {
private final DispatchableSubPlan _dispatchableSubPlan;
private final String _explainPlan;
private final Set<String> _tableNames;
+ private final Map<String, String> _extraFields;
QueryPlannerResult(@Nullable DispatchableSubPlan dispatchableSubPlan,
@Nullable String explainPlan,
- Set<String> tableNames) {
+ Set<String> tableNames, Map<String, String> extraFields) {
_dispatchableSubPlan = dispatchableSubPlan;
_explainPlan = explainPlan;
_tableNames = tableNames;
+ _extraFields = extraFields;
}
public String getExplainPlan() {
@@ -335,6 +356,10 @@ public class QueryEnvironment {
public Set<String> getTableNames() {
return _tableNames;
}
+
+ public Map<String, String> getExtraFields() {
+ return _extraFields;
+ }
}
// --------------------------------------------------------------------------
@@ -391,7 +416,11 @@ public class QueryEnvironment {
try {
RelOptPlanner optPlanner = plannerContext.getRelOptPlanner();
optPlanner.setRoot(relRoot.rel);
+ RuleTimingPlannerListener listener = new
RuleTimingPlannerListener(plannerContext);
+ optPlanner.addListener(listener);
RelNode optimized = optPlanner.findBestExp();
+ listener.printRuleTimings();
+ listener.populateRuleTimings();
RelOptPlanner traitPlanner = plannerContext.getRelTraitPlanner();
traitPlanner.setRoot(optimized);
return traitPlanner.findBestExp();
@@ -485,7 +514,10 @@ public class QueryEnvironment {
public interface Config {
String getDatabase();
- @Nullable // In theory nullable only in tests. We should fix
LiteralOnlyBrokerRequestTest to not need this.
+ /**
+ * In theory nullable only in tests. We should fix
LiteralOnlyBrokerRequestTest to not need this.
+ */
+ @Nullable
TableCache getTableCache();
/**
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
index c797f5a6fe..886e69b040 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.context;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptPlanner;
@@ -27,6 +28,7 @@ import org.apache.calcite.prepare.PlannerImpl;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelDistributionTraitDef;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.pinot.query.QueryEnvironment;
@@ -49,15 +51,20 @@ public class PlannerContext implements AutoCloseable {
private final LogicalPlanner _relTraitPlanner;
private final Map<String, String> _options;
+ private final Map<String, String> _plannerOutput;
+ private final SqlExplainFormat _sqlExplainFormat;
public PlannerContext(FrameworkConfig config, Prepare.CatalogReader
catalogReader, RelDataTypeFactory typeFactory,
- HepProgram optProgram, HepProgram traitProgram, Map<String, String>
options, QueryEnvironment.Config envConfig) {
+ HepProgram optProgram, HepProgram traitProgram, Map<String, String>
options, QueryEnvironment.Config envConfig,
+ SqlExplainFormat sqlExplainFormat) {
_planner = new PlannerImpl(config);
_validator = new Validator(config.getOperatorTable(), catalogReader,
typeFactory);
_relOptPlanner = new LogicalPlanner(optProgram, Contexts.EMPTY_CONTEXT,
config.getTraitDefs());
_relTraitPlanner = new LogicalPlanner(traitProgram, Contexts.of(envConfig),
Collections.singletonList(RelDistributionTraitDef.INSTANCE));
_options = options;
+ _plannerOutput = new HashMap<>();
+ _sqlExplainFormat = sqlExplainFormat;
}
public PlannerImpl getPlanner() {
@@ -84,4 +91,12 @@ public class PlannerContext implements AutoCloseable {
public void close() {
_planner.close();
}
+
+ public Map<String, String> getPlannerOutput() {
+ return _plannerOutput;
+ }
+
+ public SqlExplainFormat getSqlExplainFormat() {
+ return _sqlExplainFormat;
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/RuleTimingPlannerListener.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/RuleTimingPlannerListener.java
new file mode 100644
index 0000000000..7d0e1b5d19
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/RuleTimingPlannerListener.java
@@ -0,0 +1,160 @@
+package org.apache.pinot.query.context;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.calcite.plan.RelOptListener;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RuleTimingPlannerListener implements RelOptListener {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RuleTimingPlannerListener.class);
+ public static final String RULE_TIMINGS = "RULE_TIMINGS";
+
+ private final PlannerContext _plannerContext;
+ private final Map<RelOptRule, Long> _ruleStartTimes = new HashMap<>();
+ private final Map<RelOptRule, Long> _ruleDurations = new HashMap<>();
+
+ public RuleTimingPlannerListener(PlannerContext plannerContext) {
+ _plannerContext = plannerContext;
+ }
+
+ @Override
+ public void ruleAttempted(RuleAttemptedEvent event) {
+ // Capture start time when a rule is attempted
+ if (event.isBefore()) {
+ _ruleStartTimes.put(event.getRuleCall().getRule(), System.nanoTime());
+ } else {
+ if (_ruleStartTimes.containsKey(event.getRuleCall().getRule())) {
+ long duration = System.nanoTime() -
_ruleStartTimes.get(event.getRuleCall().getRule());
+ _ruleDurations.put(event.getRuleCall().getRule(),
+ _ruleDurations.getOrDefault(event.getRuleCall().getRule(), 0L) +
duration);
+ }
+ }
+ }
+
+ @Override
+ public void ruleProductionSucceeded(RuleProductionEvent event) {
+ }
+
+ @Override
+ public void relEquivalenceFound(RelEquivalenceEvent event) {
+ /* Not used */
+ }
+
+ @Override
+ public void relDiscarded(RelDiscardedEvent event) {
+ /* Not used */
+ }
+
+ @Override
+ public void relChosen(RelChosenEvent event) {
+ /* Not used */
+ }
+
+ public void printRuleTimings() {
+ LOGGER.info(getRuleTimings(SqlExplainFormat.DOT));
+ }
+
+ public void populateRuleTimings() {
+ _plannerContext.getPlannerOutput().put(RULE_TIMINGS,
getRuleTimings(_plannerContext.getSqlExplainFormat()));
+ }
+
+ public String getRuleTimings(SqlExplainFormat format) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ switch (format) {
+ case XML:
+ pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
+ pw.println("<RuleExecutionTimes>");
+ for (Map.Entry<RelOptRule, Long> entry : _ruleDurations.entrySet()) {
+ String ruleName = entry.getKey().toString()
+ .replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ .replace("\"", """)
+ .replace("'", "'");
+ pw.println("\t<Rule>");
+ pw.println("\t\t<Name>" + ruleName + "</Name>");
+ pw.println("\t\t<Time>" + entry.getValue() / 1_000_000.0 +
"</Time>");
+ pw.println("\t</Rule>");
+ }
+ pw.println("</RuleExecutionTimes>");
+ break;
+ case JSON:
+ pw.println("{");
+ pw.println(" \"ruleExecutionTimes\": [");
+ boolean firstEntry = true;
+ for (Map.Entry<RelOptRule, Long> entry : _ruleDurations.entrySet()) {
+ if (!firstEntry) {
+ pw.println(",");
+ }
+ firstEntry = false;
+ // Escape special JSON characters
+ String ruleName = entry.getKey().toString()
+ .replace("\\", "\\\\")
+ .replace("\"", "\\\"")
+ .replace("\b", "\\b")
+ .replace("\f", "\\f")
+ .replace("\n", "\\n")
+ .replace("\r", "\\r")
+ .replace("\t", "\\t");
+ double timeMillis = entry.getValue() / 1_000_000.0;
+ pw.println(" {");
+ pw.print(" \"rule\": \"");
+ pw.print(ruleName);
+ pw.println("\", ");
+ pw.print(" \"time\": ");
+ pw.printf("%.2f\n", timeMillis); // Format to 2 decimal places
+ pw.print(" }");
+ }
+ pw.println(" ]");
+ pw.println("}");
+ break;
+ case DOT:
+ pw.println("digraph PlannerTimings {");
+ for (Map.Entry<RelOptRule, Long> entry : _ruleDurations.entrySet()) {
+ pw.print("Rule: ");
+ pw.print(entry.getKey());
+ pw.print(" -> Time: ");
+ pw.println(entry.getValue() / 1_000_000.0);
+ }
+ pw.println("}");
+ break;
+ default:
+ pw.println("Rule Execution Times");
+ for (Map.Entry<RelOptRule, Long> entry : _ruleDurations.entrySet()) {
+ pw.print("Rule: ");
+ pw.print(entry.getKey());
+ pw.print(" -> Time: ");
+ pw.println(entry.getValue() / 1_000_000.0);
+ }
+ break;
+ }
+ pw.flush();
+ return sw.toString();
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PinotRelJsonWriter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PinotRelJsonWriter.java
index 5bce1a81bd..6a42225cea 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PinotRelJsonWriter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PinotRelJsonWriter.java
@@ -21,8 +21,11 @@ package org.apache.pinot.query.planner.explain;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.externalize.RelJson;
import org.apache.calcite.rel.externalize.RelJsonWriter;
+import org.apache.calcite.util.JsonBuilder;
import org.apache.calcite.util.Pair;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -30,6 +33,11 @@ import org.checkerframework.checker.nullness.qual.Nullable;
* Extends {@link RelJsonWriter} to add the type of the relational algebra
node.
*/
public class PinotRelJsonWriter extends RelJsonWriter {
+
+ public PinotRelJsonWriter() {
+ super(new JsonBuilder(), relJson -> new PinotRelJson(relJson));
+ }
+
@Override
protected void explain_(RelNode rel, List<Pair<String, @Nullable Object>>
values) {
super.explain_(rel, values);
@@ -41,4 +49,24 @@ public class PinotRelJsonWriter extends RelJsonWriter {
map.put("type", rel.getRelTypeName());
}
}
+
+ static class PinotRelJson extends RelJson {
+ private final RelJson _relJson;
+
+ /**
+ * Creates a PinotRelJson.
+ */
+ public PinotRelJson(RelJson relJson) {
+ super(null);
+ _relJson = relJson;
+ }
+
+ @Override
+ public @Nullable Object toJson(@Nullable Object value) {
+ if (value instanceof AggregateNode.AggType) {
+ return ((AggregateNode.AggType) value).name();
+ }
+ return _relJson.toJson(value);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]