This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ac32016b35 Low-risk optProgram rule enhancements (#16035)
ac32016b35 is described below
commit ac32016b350c9c7571b0c3661709929f8e7714e1
Author: Song Fu <[email protected]>
AuthorDate: Wed Jun 11 13:44:42 2025 -0700
Low-risk optProgram rule enhancements (#16035)
---
.../PinotJoinPushTransitivePredicatesRule.java | 81 ++++++++++++++
.../calcite/rel/rules/PinotQueryRuleSets.java | 9 +-
.../apache/pinot/query/QueryCompilationTest.java | 116 +++++++++++++++++++++
.../resources/queries/ExplainPhysicalPlans.json | 12 +--
.../src/test/resources/queries/JoinPlans.json | 14 +--
5 files changed, 219 insertions(+), 13 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinPushTransitivePredicatesRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinPushTransitivePredicatesRule.java
new file mode 100644
index 0000000000..24d11d2b53
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinPushTransitivePredicatesRule.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.JoinPushTransitivePredicatesRule;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
+
+
+public class PinotJoinPushTransitivePredicatesRule extends
JoinPushTransitivePredicatesRule {
+
+ protected PinotJoinPushTransitivePredicatesRule(Config config) {
+ super(config);
+ }
+
+ public static final PinotJoinPushTransitivePredicatesRule INSTANCE
+ = new PinotJoinPushTransitivePredicatesRule(Config.DEFAULT);
+
+ // Following code are copy-pasted from Calcite, and modified to not push
down filter into right side of lookup join.
+ //@formatter:off
+ @Override public void onMatch(RelOptRuleCall call) {
+ Join join = call.rel(0);
+ final RelMetadataQuery mq = call.getMetadataQuery();
+ RelOptPredicateList preds = mq.getPulledUpPredicates(join);
+
+ if (preds.leftInferredPredicates.isEmpty()
+ && preds.rightInferredPredicates.isEmpty()) {
+ return;
+ }
+
+ final RelBuilder relBuilder = call.builder();
+
+ RelNode left = join.getLeft();
+ if (!preds.leftInferredPredicates.isEmpty()) {
+ RelNode curr = left;
+ left = relBuilder.push(left)
+ .filter(preds.leftInferredPredicates).build();
+ call.getPlanner().onCopy(curr, left);
+ }
+
+ // PINOT MODIFICATION to not push down filter into right side of lookup
join.
+ boolean canPushRight =
!PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join);
+
+ RelNode right = join.getRight();
+ if (canPushRight && !preds.rightInferredPredicates.isEmpty()) {
+ RelNode curr = right;
+ right = relBuilder.push(right)
+ .filter(preds.rightInferredPredicates).build();
+ call.getPlanner().onCopy(curr, right);
+ }
+
+ RelNode newRel =
+ join.copy(join.getTraitSet(), join.getCondition(), left, right,
+ join.getJoinType(), join.isSemiJoinDone());
+ call.getPlanner().onCopy(join, newRel);
+
+ call.transformTo(newRel);
+ }
+ //@formatter:on
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
index 1e80f94e9e..238bcf89f2 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
@@ -75,6 +75,10 @@ public class PinotQueryRuleSets {
CoreRules.PROJECT_TO_SEMI_JOIN,
PinotSeminJoinDistinctProjectRule.INSTANCE,
+ // Consider semijoin optimizations first before push transitive predicate
+ // Pinot version doesn't push predicates to the right in case of lookup
join
+ PinotJoinPushTransitivePredicatesRule.INSTANCE,
+
// convert non-all union into all-union + distinct
CoreRules.UNION_TO_DISTINCT,
@@ -97,7 +101,8 @@ public class PinotQueryRuleSets {
// Filter pushdown rules run using a RuleCollection since we want to push
down a filter as much as possible in a
// single HepInstruction.
public static final List<RelOptRule> FILTER_PUSHDOWN_RULES = List.of(
- CoreRules.FILTER_INTO_JOIN,
+ // Do not push predicate to the right in case of lookup join
+ PinotFilterIntoJoinRule.INSTANCE,
CoreRules.FILTER_AGGREGATE_TRANSPOSE,
CoreRules.FILTER_SET_OP_TRANSPOSE,
CoreRules.FILTER_PROJECT_TRANSPOSE
@@ -118,6 +123,8 @@ public class PinotQueryRuleSets {
CoreRules.FILTER_MERGE,
CoreRules.AGGREGATE_REMOVE,
CoreRules.SORT_REMOVE,
+ PruneEmptyRules.CORRELATE_LEFT_INSTANCE,
+ PruneEmptyRules.CORRELATE_RIGHT_INSTANCE,
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index a955727711..265cbd0748 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -41,6 +41,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.plannode.ProjectNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.testng.annotations.DataProvider;
+import org.testng.annotations.Ignore;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
@@ -94,6 +95,121 @@ public class QueryCompilationTest extends
QueryEnvironmentTestBase {
//@formatter:on
}
+ @Test
+ public void testAggregateCaseToFilter2() {
+ // queries like "SELECT SUM(CASE WHEN col1 = 'a' THEN cnt ELSE 0 END) FROM
a" are rewritten to
+ // "SELECT SUM0(cnt) FROM a WHERE col1 = 'a'"
+ String query = "EXPLAIN PLAN FOR SELECT SUM(CASE WHEN col1 = 'a' THEN 3
ELSE 0 END) FROM a";
+
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ //@formatter:off
+ assertEquals(explain,
+ "Execution Plan\n"
+ + "LogicalProject(EXPR$0=[CASE(=($1, 0), null:BIGINT, $0)])\n"
+ + " PinotLogicalAggregate(group=[{}], agg#0=[$SUM0($0)],
agg#1=[COUNT($1)], aggType=[FINAL])\n"
+ + " PinotLogicalExchange(distribution=[hash])\n"
+ + " PinotLogicalAggregate(group=[{}], agg#0=[$SUM0($0) FILTER
$1], agg#1=[COUNT()], aggType=[LEAF])\n"
+ + " LogicalProject($f1=[3], $f2=[=($0, _UTF-8'a')])\n"
+ + " PinotLogicalTableScan(table=[[default, a]])\n");
+ //@formatter:on
+ }
+
+ @Test
+ public void testPruneEmptyCorrelateJoin() {
+ // queries involving correlated join with dummy
+ // should be optimized to dummy by PruneEmptyRules.CORRELATE_LEFT_INSTANCE
+ // or its right equivalence
+ String query = "EXPLAIN PLAN FOR SELECT *\n"
+ + "FROM (\n"
+ + " SELECT * FROM a WHERE 1 = 0\n"
+ + ") t1\n"
+ + "WHERE EXISTS (\n"
+ + " SELECT 1\n"
+ + " FROM a\n"
+ + " WHERE a.col1 = t1.col1\n"
+ + ");\n";
+
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ //@formatter:off
+ assertEquals(explain,
+ "Execution Plan\n"
+ + "LogicalValues(tuples=[[]])\n");
+ //@formatter:on
+ }
+
+ @Test
+ public void testJoinPushTransitivePredicate() {
+ // queries involving extra predicate on join keys
+ // should be optimized to push the predicate to both sides of the join if
applicable
+ String query = "EXPLAIN PLAN FOR\n"
+ + "SELECT * FROM a\n"
+ + "JOIN b\n"
+ + "ON a.col1 = b.col1\n"
+ + "WHERE a.col1 = 1;\n";
+
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ //@formatter:off
+ assertEquals(explain,
+ "Execution Plan\n"
+ + "LogicalJoin(condition=[=($0, $9)], joinType=[inner])\n"
+ + " PinotLogicalExchange(distribution=[hash[0]])\n"
+ + " LogicalFilter(condition=[=(CAST($0):INTEGER NOT NULL,
1)])\n"
+ + " PinotLogicalTableScan(table=[[default, a]])\n"
+ + " PinotLogicalExchange(distribution=[hash[0]])\n"
+ + " LogicalFilter(condition=[=(CAST($0):INTEGER NOT NULL,
1)])\n"
+ + " PinotLogicalTableScan(table=[[default, b]])\n");
+ //@formatter:on
+ }
+
+ @Test
+ public void testJoinPushTransitivePredicateLookupJoin() {
+ // PinotJoinPushTransitivePredicatesRule
+ // should not push to the right under lookup join hint
+ String query = "EXPLAIN PLAN FOR\n"
+ + "SELECT /*+ joinOptions(join_strategy='lookup') */ \n"
+ + "* FROM a\n"
+ + "JOIN b\n"
+ + "ON a.col1 = b.col1\n"
+ + "WHERE a.col1 = 1;\n";
+
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ //@formatter:off
+ assertEquals(explain,
+ "Execution Plan\n"
+ + "LogicalJoin(condition=[=($0, $9)], joinType=[inner])\n"
+ + " PinotLogicalExchange(distribution=[single])\n"
+ + " LogicalFilter(condition=[=(CAST($0):INTEGER NOT NULL,
1)])\n"
+ + " PinotLogicalTableScan(table=[[default, a]])\n"
+ + " PinotLogicalTableScan(table=[[default, b]])\n");
+ //@formatter:on
+ }
+
+ @Ignore("This test requires PRUNE_RULES before BASIC_RULES to pass, however
enabling that"
+ + "introduces changes that ~50 hardcoded plans in
ResourceBasedQueriesTest would change."
+ + "It is also needed to investigate why there would be redundant Project
and Exchange"
+ + "when the extra pruning is enabled")
+ @Test
+ public void testAggregateJoinRemove() {
+ // queries where join is left or right join and the aggregate above it has
no aggCall
+ // or all aggCalls are DISTINCT
+ // should be optimized to remove the join completely
+ String query = "EXPLAIN PLAN FOR\n"
+ + "SELECT a.col1, COUNT(DISTINCT a.col3) \n"
+ + "FROM a \n"
+ + "LEFT JOIN b ON a.col2 = b.col2\n"
+ + "GROUP BY a.col1;";
+
+ String explain = _queryEnvironment.explainQuery(query,
RANDOM_REQUEST_ID_GEN.nextLong());
+ //@formatter:off
+ assertEquals(explain,
+ "Execution Plan\n"
+ + "PinotLogicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)],
aggType=[FINAL])\n"
+ + " PinotLogicalExchange(distribution=[hash[0]])\n"
+ + " PinotLogicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($2)],
aggType=[LEAF])\n"
+ + " PinotLogicalTableScan(table=[[default, a]])\n");
+ //@formatter:on
+ }
+
private static void assertGroupBySingletonAfterJoin(DispatchableSubPlan
dispatchableSubPlan, boolean shouldRewrite) {
for (int stageId = 0; stageId <
dispatchableSubPlan.getQueryStageMap().size(); stageId++) {
if (dispatchableSubPlan.getTableNames().size() == 0 &&
!PlannerUtils.isRootPlanFragment(stageId)) {
diff --git
a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
index db28d08439..8f856b5195 100644
--- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
@@ -152,7 +152,8 @@
" └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
" └── [2]@localhost:1|[1] JOIN\n",
" ├── [2]@localhost:1|[1] PROJECT\n",
- " │ └── [2]@localhost:1|[1] TABLE SCAN (a)
null\n",
+ " │ └── [2]@localhost:1|[1] FILTER\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN
(a) null\n",
" └── [2]@localhost:1|[1]
MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
" ├── [3]@localhost:2|[2]
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2,
3]} (Subtree Omitted)\n",
" ├── [3]@localhost:2|[3]
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2,
3]} (Subtree Omitted)\n",
@@ -160,8 +161,7 @@
" └── [3]@localhost:1|[1]
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2,
3]}\n",
" └── [3]@localhost:1|[1] PROJECT\n",
" └── [3]@localhost:1|[1]
FILTER\n",
- " └── [3]@localhost:1|[1]
TABLE SCAN (b) null\n",
- ""
+ " └── [3]@localhost:1|[1]
TABLE SCAN (b) null\n"
]
},
{
@@ -180,7 +180,8 @@
" └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
" └── [2]@localhost:1|[1] JOIN\n",
" ├── [2]@localhost:1|[1] PROJECT\n",
- " │ └── [2]@localhost:1|[1] TABLE SCAN (a)
null\n",
+ " │ └── [2]@localhost:1|[1] FILTER\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN
(a) null\n",
" └── [2]@localhost:1|[1]
MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" ├── [3]@localhost:2|[2]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree
Omitted)\n",
" ├── [3]@localhost:2|[3]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree
Omitted)\n",
@@ -188,8 +189,7 @@
" └── [3]@localhost:1|[1]
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
" └── [3]@localhost:1|[1] PROJECT\n",
" └── [3]@localhost:1|[1]
FILTER\n",
- " └── [3]@localhost:1|[1]
TABLE SCAN (b) null\n",
- ""
+ " └── [3]@localhost:1|[1]
TABLE SCAN (b) null\n"
]
},
{
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index 9ce84bc53b..dea372f5fd 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -520,7 +520,8 @@
"\nLogicalJoin(condition=[=($2, $9)], joinType=[semi])",
"\n LogicalJoin(condition=[=($1, $9)], joinType=[semi])",
"\n LogicalJoin(condition=[=($1, $9)], joinType=[semi])",
- "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n LogicalFilter(condition=[<($2, 100)])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast],
relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[SEARCH($1,
Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR
CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"),
(_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET
\"UTF-8\")])",
@@ -548,7 +549,8 @@
"\n LogicalProject(col3=[$1])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
"\n LogicalProject(col2=[$1], col3=[$2])",
- "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n LogicalFilter(condition=[<($2, 100)])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast],
relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[SEARCH($1,
Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR
CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"),
(_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET
\"UTF-8\")])",
@@ -569,12 +571,12 @@
"\n LogicalFilter(condition=[>($2, 10)])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
agg#1=[COUNT($2)], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
- "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
agg#1=[COUNT()], aggType=[LEAF])",
- "\n LogicalJoin(condition=[=($1, $2)], joinType=[semi])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
agg#1=[COUNT()], aggType=[LEAF])\n LogicalJoin(condition=[=($1, $2)],
joinType=[semi])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalJoin(condition=[=($1, $3)],
joinType=[semi])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
- "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n LogicalFilter(condition=[<($2, 100)])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast],
relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[SEARCH($1,
Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR
CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"),
(_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET
\"UTF-8\")])",
@@ -601,7 +603,7 @@
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast],
relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col3=[$2])",
- "\n LogicalFilter(condition=[SEARCH($1,
Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR
CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"),
(_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET
\"UTF-8\")])",
+ "\n LogicalFilter(condition=[AND(SEARCH($1,
Sarg[(-∞.._UTF-8'bar':VARCHAR CHARACTER SET \"UTF-8\"), (_UTF-8'bar':VARCHAR
CHARACTER SET \"UTF-8\".._UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"),
(_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"..+∞)]:VARCHAR CHARACTER SET
\"UTF-8\"), <($2, 100))])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]