This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 281478e819 Enabling LogicalProject pushdown optimizations to eliminate
exchange of unused columns (#14198)
281478e819 is described below
commit 281478e81951ce804ba890042002080dfbd201de
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Tue Oct 22 16:56:28 2024 -0700
Enabling LogicalProject pushdown optimizations to eliminate exchange of
unused columns (#14198)
---
.../calcite/rel/rules/PinotQueryRuleSets.java | 8 ++
.../org/apache/pinot/query/QueryEnvironment.java | 6 ++
.../src/test/resources/queries/JoinPlans.json | 98 +++++++++++++++++-----
.../resources/queries/WindowFunctionPlans.json | 4 +-
4 files changed, 94 insertions(+), 22 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
index 45f867c11b..5bc55835e7 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
@@ -101,6 +101,14 @@ public class PinotQueryRuleSets {
CoreRules.FILTER_PROJECT_TRANSPOSE
);
+ // Project pushdown rules run using a RuleCollection since we want to push
down a project as much as possible in a
+ // single HepInstruction.
+ public static final List<RelOptRule> PROJECT_PUSHDOWN_RULES = List.of(
+ CoreRules.PROJECT_FILTER_TRANSPOSE,
+ CoreRules.PROJECT_JOIN_TRANSPOSE,
+ CoreRules.PROJECT_MERGE
+ );
+
// The pruner rules run top-down to ensure Calcite restarts from root node
after applying a transformation.
public static final List<RelOptRule> PRUNE_RULES = List.of(
CoreRules.AGGREGATE_PROJECT_MERGE,
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 1681d41c94..629c7ae2c5 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -427,6 +427,12 @@ public class QueryEnvironment {
// Pushdown filters using a single HepInstruction.
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);
+ // Pushdown projects after first filter pushdown to minimize projected
columns.
+
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES);
+
+ // Pushdown filters again since filter should be pushed down at the lowest
level, after project pushdown.
+
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);
+
// ----
// Prune duplicate/unnecessary nodes using a single HepInstruction.
// TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases
where it would help.
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index 168c3ceaa3..45fcf62251 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -272,14 +272,14 @@
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
- "\n LogicalFilter(condition=[IS NOT TRUE($8)])",
- "\n LogicalJoin(condition=[=($6, $7)], joinType=[left])",
- "\n PinotLogicalExchange(distribution=[hash[6]])",
- "\n LogicalProject(col1=[$0], col2=[$1], col30=[$3],
$f1=[$4], col32=[$5], $f10=[$7], col34=[$2])",
- "\n LogicalFilter(condition=[IS NOT TRUE($7)])",
- "\n LogicalJoin(condition=[=($5, $6)], joinType=[left])",
- "\n PinotLogicalExchange(distribution=[hash[5]])",
- "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2],
col30=[$3], $f1=[$5], col32=[$2])",
+ "\n LogicalFilter(condition=[IS NOT TRUE($4)])",
+ "\n LogicalJoin(condition=[=($2, $3)], joinType=[left])",
+ "\n PinotLogicalExchange(distribution=[hash[2]])",
+ "\n LogicalProject(col1=[$0], col2=[$1], col34=[$2])",
+ "\n LogicalFilter(condition=[IS NOT TRUE($5)])",
+ "\n LogicalJoin(condition=[=($3, $4)], joinType=[left])",
+ "\n PinotLogicalExchange(distribution=[hash[3]])",
+ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2],
col32=[$2])",
"\n LogicalFilter(condition=[IS NOT TRUE($5)])",
"\n LogicalJoin(condition=[=($3, $4)],
joinType=[left])",
"\n
PinotLogicalExchange(distribution=[hash[3]])",
@@ -294,19 +294,21 @@
"\n LogicalFilter(condition=[=($0,
_UTF-8'foo')])",
"\n
LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
- "\n PinotLogicalAggregate(group=[{0}],
agg#0=[MIN($1)])",
- "\n PinotLogicalExchange(distribution=[hash[0]])",
- "\n PinotLogicalAggregate(group=[{0}],
agg#0=[MIN($1)])",
- "\n LogicalProject(col3=[$2], $f1=[true])",
- "\n LogicalFilter(condition=[=($0,
_UTF-8'bar')])",
- "\n LogicalTableScan(table=[[default, b]])",
+ "\n LogicalProject(col3=[$0], $f1=[$1])",
+ "\n PinotLogicalAggregate(group=[{0}],
agg#0=[MIN($1)])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{0}],
agg#0=[MIN($1)])",
+ "\n LogicalProject(col3=[$2], $f1=[true])",
+ "\n LogicalFilter(condition=[=($0,
_UTF-8'bar')])",
+ "\n LogicalTableScan(table=[[default,
b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
- "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
- "\n PinotLogicalExchange(distribution=[hash[0]])",
- "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
- "\n LogicalProject(col3=[$2], $f1=[true])",
- "\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])",
- "\n LogicalTableScan(table=[[default, b]])",
+ "\n LogicalProject(col3=[$0], $f1=[$1])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{0}],
agg#0=[MIN($1)])",
+ "\n LogicalProject(col3=[$2], $f1=[true])",
+ "\n LogicalFilter(condition=[=($0,
_UTF-8'foobar')])",
+ "\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
@@ -517,6 +519,62 @@
"\n LogicalTableScan(table=[[default, a]])",
"\n"
]
+ },
+ {
+ "description": "Multiple IN and NOT IN joins while selecting count at
top",
+ "sql": "EXPLAIN PLAN FOR SELECT count(*) FROM a WHERE a.col1 = 'foo'
AND col2 = 'xylo' AND a.col4 = 12 AND a.col5 = false AND col3 NOT IN (SELECT
col3 FROM b WHERE col1='foo') AND col3 NOT IN (SELECT col3 FROM b WHERE
col1='bar') AND col3 NOT IN (SELECT col3 FROM b WHERE col1='foobar') AND col3
IN (SELECT col3 FROM b WHERE col1 = 'fork')",
+ "output": [
+ "Execution Plan",
+ "\nPinotLogicalAggregate(group=[{}], agg#0=[COUNT($0)])",
+ "\n PinotLogicalExchange(distribution=[hash])",
+ "\n PinotLogicalAggregate(group=[{}], agg#0=[COUNT()])",
+ "\n LogicalJoin(condition=[=($0, $1)], joinType=[semi])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col3=[$0])",
+ "\n LogicalFilter(condition=[IS NOT TRUE($3)])",
+ "\n LogicalJoin(condition=[=($1, $2)],
joinType=[left])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
+ "\n LogicalProject(col3=[$0], col34=[$0])",
+ "\n LogicalFilter(condition=[IS NOT TRUE($3)])",
+ "\n LogicalJoin(condition=[=($1, $2)],
joinType=[left])",
+ "\n
PinotLogicalExchange(distribution=[hash[1]])",
+ "\n LogicalProject(col3=[$0], col32=[$0])",
+ "\n LogicalFilter(condition=[IS NOT
TRUE($3)])",
+ "\n LogicalJoin(condition=[=($1, $2)],
joinType=[left])",
+ "\n
PinotLogicalExchange(distribution=[hash[1]])",
+ "\n LogicalProject(col3=[$2],
col30=[$2])",
+ "\n
LogicalFilter(condition=[AND(=($0, _UTF-8'foo'), =($1, _UTF-8'xylo'), =($3,
12), NOT($4))])",
+ "\n
LogicalTableScan(table=[[default, a]])",
+ "\n
PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col3=[$0],
$f1=[$1])",
+ "\n
PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+ "\n
PinotLogicalExchange(distribution=[hash[0]])",
+ "\n
PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+ "\n
LogicalProject(col3=[$2], $f1=[true])",
+ "\n
LogicalFilter(condition=[=($0, _UTF-8'foo')])",
+ "\n
LogicalTableScan(table=[[default, b]])",
+ "\n
PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col3=[$0], $f1=[$1])",
+ "\n PinotLogicalAggregate(group=[{0}],
agg#0=[MIN($1)])",
+ "\n
PinotLogicalExchange(distribution=[hash[0]])",
+ "\n
PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+ "\n LogicalProject(col3=[$2],
$f1=[true])",
+ "\n
LogicalFilter(condition=[=($0, _UTF-8'bar')])",
+ "\n
LogicalTableScan(table=[[default, b]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col3=[$0], $f1=[$1])",
+ "\n PinotLogicalAggregate(group=[{0}],
agg#0=[MIN($1)])",
+ "\n
PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{0}],
agg#0=[MIN($1)])",
+ "\n LogicalProject(col3=[$2], $f1=[true])",
+ "\n LogicalFilter(condition=[=($0,
_UTF-8'foobar')])",
+ "\n LogicalTableScan(table=[[default,
b]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col3=[$2])",
+ "\n LogicalFilter(condition=[=($0, _UTF-8'fork')])",
+ "\n LogicalTableScan(table=[[default, b]])",
+ "\n"
+ ]
}
]
},
diff --git
a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index ac8ef92784..191dea2fdf 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -3404,7 +3404,7 @@
"sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1,
ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rownum from a) SELECT
a.col1, a.rownum FROM windowfunc AS a where a.rownum < 5",
"output": [
"Execution Plan",
- "\nLogicalProject(col1=[$0], $1=[$3])",
+ "\nLogicalProject(col1=[$0], w0$o0=[$3])",
"\n LogicalFilter(condition=[<($3, 5)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [2]
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
"\n PinotLogicalSortExchange(distribution=[hash[1]],
collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
@@ -3418,7 +3418,7 @@
"sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, RANK()
OVER(PARTITION BY a.col2 ORDER BY a.col3) as rank, DENSE_RANK() OVER(PARTITION
BY a.col2 ORDER BY a.col3) as dense_rank from a) SELECT a.col1, a.rank,
a.dense_rank FROM windowfunc AS a where a.dense_rank < 5",
"output": [
"Execution Plan",
- "\nLogicalProject(col1=[$0], $1=[$3], $2=[$4])",
+ "\nLogicalProject(col1=[$0], w0$o0=[$3], w0$o1=[$4])",
"\n LogicalFilter(condition=[<($4, 5)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [2]
aggs [RANK(), DENSE_RANK()])])",
"\n PinotLogicalSortExchange(distribution=[hash[1]],
collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]