This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 86cb815e03 [Minor] Remove redundant ProjectionExec nodes in sort-based
plans (#20780)
86cb815e03 is described below
commit 86cb815e03e5244a3a2c9757171c252a3a65181d
Author: Daniƫl Heres <[email protected]>
AuthorDate: Wed Mar 11 07:52:04 2026 +0100
[Minor] Remove redundant ProjectionExec nodes in sort-based plans (#20780)
## Which issue does this PR close?
- Closes #.
## Rationale for this change
ClickBench quueries (Q7, Q15, Q16, Q18) have some redundant projections
for sorting based on count.
Probably not a (measurable) improvement, but the plan looks better (in
case of non-TopK it could probably be measurable).
## What changes are included in this PR?
## Are these changes tested?
Existing tests.
## Are there any user-facing changes?
---------
Co-authored-by: Claude <[email protected]>
---
datafusion/core/tests/dataframe/mod.rs | 36 ++++---
datafusion/expr/src/expr_rewriter/order_by.rs | 16 +++-
datafusion/sql/tests/cases/plan_to_sql.rs | 2 +-
datafusion/sqllogictest/test_files/clickbench.slt | 110 ++++++++++------------
4 files changed, 80 insertions(+), 84 deletions(-)
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index c94ab10a9e..b1ee8b09b9 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -3001,24 +3001,22 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
assert_snapshot!(
pretty_format_batches(&sql_results).unwrap(),
@r"
-
+---------------+------------------------------------------------------------------------------------------------------------+
- | plan_type | plan
|
-
+---------------+------------------------------------------------------------------------------------------------------------+
- | logical_plan | Projection: t1.b, count(*)
|
- | | Sort: count(Int64(1)) AS count(*) AS count(*) ASC
NULLS LAST |
- | | Projection: t1.b, count(Int64(1)) AS count(*),
count(Int64(1)) |
- | | Aggregate: groupBy=[[t1.b]],
aggr=[[count(Int64(1))]] |
- | | TableScan: t1 projection=[b]
|
- | physical_plan | ProjectionExec: expr=[b@0 as b, count(*)@1 as count(*)]
|
- | | SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS
LAST] |
- | | SortExec: expr=[count(*)@1 ASC NULLS LAST],
preserve_partitioning=[true] |
- | | ProjectionExec: expr=[b@0 as b, count(Int64(1))@1
as count(*), count(Int64(1))@1 as count(Int64(1))] |
- | | AggregateExec: mode=FinalPartitioned, gby=[b@0
as b], aggr=[count(Int64(1))] |
- | | RepartitionExec: partitioning=Hash([b@0], 4),
input_partitions=1 |
- | | AggregateExec: mode=Partial, gby=[b@0 as b],
aggr=[count(Int64(1))] |
- | | DataSourceExec: partitions=1,
partition_sizes=[1] |
- | |
|
-
+---------------+------------------------------------------------------------------------------------------------------------+
+
+---------------+------------------------------------------------------------------------------------+
+ | plan_type | plan
|
+
+---------------+------------------------------------------------------------------------------------+
+ | logical_plan | Sort: count(*) AS count(*) ASC NULLS LAST
|
+ | | Projection: t1.b, count(Int64(1)) AS count(*)
|
+ | | Aggregate: groupBy=[[t1.b]],
aggr=[[count(Int64(1))]] |
+ | | TableScan: t1 projection=[b]
|
+ | physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST]
|
+ | | SortExec: expr=[count(*)@1 ASC NULLS LAST],
preserve_partitioning=[true] |
+ | | ProjectionExec: expr=[b@0 as b, count(Int64(1))@1 as
count(*)] |
+ | | AggregateExec: mode=FinalPartitioned, gby=[b@0 as
b], aggr=[count(Int64(1))] |
+ | | RepartitionExec: partitioning=Hash([b@0], 4),
input_partitions=1 |
+ | | AggregateExec: mode=Partial, gby=[b@0 as b],
aggr=[count(Int64(1))] |
+ | | DataSourceExec: partitions=1,
partition_sizes=[1] |
+ | |
|
+
+---------------+------------------------------------------------------------------------------------+
"
);
@@ -3028,7 +3026,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
+---------------+----------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+----------------------------------------------------------------------------+
- | logical_plan | Sort: count(*) ASC NULLS LAST
|
+ | logical_plan | Sort: count(*) AS count(*) ASC NULLS LAST
|
| | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS
count(*)]] |
| | TableScan: t1 projection=[b]
|
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST]
|
diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs
b/datafusion/expr/src/expr_rewriter/order_by.rs
index 7c6af56c8d..a897e56d27 100644
--- a/datafusion/expr/src/expr_rewriter/order_by.rs
+++ b/datafusion/expr/src/expr_rewriter/order_by.rs
@@ -77,8 +77,10 @@ fn rewrite_in_terms_of_projection(
// assumption is that each item in exprs, such as "b + c" is
// available as an output column named "b + c"
expr.transform(|expr| {
- // search for unnormalized names first such as "c1" (such as aliases)
- if let Some(found) = proj_exprs.iter().find(|a| (**a) == expr) {
+ // search for unnormalized names first such as "c1" (such as aliases).
+ // Also look inside aliases so e.g. `count(Int64(1))` matches
+ // `count(Int64(1)) AS count(*)`.
+ if let Some(found) = proj_exprs.iter().find(|a| expr_match(&expr, a)) {
let (qualifier, field_name) = found.qualified_name();
let col = Expr::Column(Column::new(qualifier, field_name));
return Ok(Transformed::yes(col));
@@ -235,18 +237,22 @@ mod test {
TestCase {
desc: r#"min(c2) --> "min(c2)" -- (column *named*
"min(t.c2)"!)"#,
input: sort(min(col("c2"))),
- expected: sort(col("min(t.c2)")),
+ expected:
sort(Expr::Column(Column::new_unqualified("min(t.c2)"))),
},
TestCase {
desc: r#"c1 + min(c2) --> "c1 + min(c2)" -- (column *named*
"min(t.c2)"!)"#,
input: sort(col("c1") + min(col("c2"))),
// should be "c1" not t.c1
- expected: sort(col("c1") + col("min(t.c2)")),
+ expected: sort(
+ col("c1") +
Expr::Column(Column::new_unqualified("min(t.c2)")),
+ ),
},
TestCase {
desc: r#"avg(c3) --> "avg(t.c3)" as average (column *named*
"avg(t.c3)", aliased)"#,
input: sort(avg(col("c3"))),
- expected: sort(col("avg(t.c3)").alias("average")),
+ expected: sort(
+
Expr::Column(Column::new_unqualified("avg(t.c3)")).alias("average"),
+ ),
},
];
diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs
b/datafusion/sql/tests/cases/plan_to_sql.rs
index 670046f164..be110ab07e 100644
--- a/datafusion/sql/tests/cases/plan_to_sql.rs
+++ b/datafusion/sql/tests/cases/plan_to_sql.rs
@@ -1984,7 +1984,7 @@ fn test_complex_order_by_with_grouping() -> Result<()> {
}, {
assert_snapshot!(
sql,
- @r#"SELECT j1.j1_id, j1.j1_string, lochierarchy FROM (SELECT
j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS
lochierarchy, grouping(j1.j1_string), grouping(j1.j1_id) FROM j1 GROUP BY
ROLLUP (j1.j1_id, j1.j1_string)) ORDER BY lochierarchy DESC NULLS FIRST, CASE
WHEN (("grouping(j1.j1_id)" + "grouping(j1.j1_string)") = 0) THEN j1.j1_id END
ASC NULLS LAST LIMIT 100"#
+ @"SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) +
grouping(j1.j1_string)) AS lochierarchy FROM j1 GROUP BY ROLLUP (j1.j1_id,
j1.j1_string) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (lochierarchy =
0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"
);
});
diff --git a/datafusion/sqllogictest/test_files/clickbench.slt
b/datafusion/sqllogictest/test_files/clickbench.slt
index dd558a4f36..881e49cdeb 100644
--- a/datafusion/sqllogictest/test_files/clickbench.slt
+++ b/datafusion/sqllogictest/test_files/clickbench.slt
@@ -205,24 +205,22 @@ query TT
EXPLAIN SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0
GROUP BY "AdvEngineID" ORDER BY COUNT(*) DESC;
----
logical_plan
-01)Projection: hits.AdvEngineID, count(*)
-02)--Sort: count(Int64(1)) AS count(*) AS count(*) DESC NULLS FIRST
-03)----Projection: hits.AdvEngineID, count(Int64(1)) AS count(*),
count(Int64(1))
-04)------Aggregate: groupBy=[[hits.AdvEngineID]], aggr=[[count(Int64(1))]]
-05)--------SubqueryAlias: hits
-06)----------Filter: hits_raw.AdvEngineID != Int16(0)
-07)------------TableScan: hits_raw projection=[AdvEngineID],
partial_filters=[hits_raw.AdvEngineID != Int16(0)]
+01)Sort: count(*) AS count(*) DESC NULLS FIRST
+02)--Projection: hits.AdvEngineID, count(Int64(1)) AS count(*)
+03)----Aggregate: groupBy=[[hits.AdvEngineID]], aggr=[[count(Int64(1))]]
+04)------SubqueryAlias: hits
+05)--------Filter: hits_raw.AdvEngineID != Int16(0)
+06)----------TableScan: hits_raw projection=[AdvEngineID],
partial_filters=[hits_raw.AdvEngineID != Int16(0)]
physical_plan
-01)ProjectionExec: expr=[AdvEngineID@0 as AdvEngineID, count(*)@1 as count(*)]
-02)--SortPreservingMergeExec: [count(Int64(1))@2 DESC]
-03)----SortExec: expr=[count(*)@1 DESC], preserve_partitioning=[true]
-04)------ProjectionExec: expr=[AdvEngineID@0 as AdvEngineID, count(Int64(1))@1
as count(*), count(Int64(1))@1 as count(Int64(1))]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[AdvEngineID@0 as
AdvEngineID], aggr=[count(Int64(1))]
-06)----------RepartitionExec: partitioning=Hash([AdvEngineID@0], 4),
input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[AdvEngineID@0 as
AdvEngineID], aggr=[count(Int64(1))]
-08)--------------FilterExec: AdvEngineID@0 != 0
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-10)------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]},
projection=[AdvEngineID], file_type=parquet, predicate=AdvEngineID@40 != 0,
pruning_predicate=AdvEngineID_null_count@2 != row_count@3 AND
(AdvEngineID_min@0 != 0 OR 0 != AdvEngineID_max@1),
required_guarantees=[AdvEngineID not in (0)]
+01)SortPreservingMergeExec: [count(*)@1 DESC]
+02)--SortExec: expr=[count(*)@1 DESC], preserve_partitioning=[true]
+03)----ProjectionExec: expr=[AdvEngineID@0 as AdvEngineID, count(Int64(1))@1
as count(*)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[AdvEngineID@0 as
AdvEngineID], aggr=[count(Int64(1))]
+05)--------RepartitionExec: partitioning=Hash([AdvEngineID@0], 4),
input_partitions=4
+06)----------AggregateExec: mode=Partial, gby=[AdvEngineID@0 as AdvEngineID],
aggr=[count(Int64(1))]
+07)------------FilterExec: AdvEngineID@0 != 0
+08)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+09)----------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]},
projection=[AdvEngineID], file_type=parquet, predicate=AdvEngineID@40 != 0,
pruning_predicate=AdvEngineID_null_count@2 != row_count@3 AND
(AdvEngineID_min@0 != 0 OR 0 != AdvEngineID_max@1),
required_guarantees=[AdvEngineID not in (0)]
query II
SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY
"AdvEngineID" ORDER BY COUNT(*) DESC;
@@ -433,21 +431,19 @@ query TT
EXPLAIN SELECT "UserID", COUNT(*) FROM hits GROUP BY "UserID" ORDER BY
COUNT(*) DESC LIMIT 10;
----
logical_plan
-01)Projection: hits.UserID, count(*)
-02)--Sort: count(Int64(1)) AS count(*) AS count(*) DESC NULLS FIRST, fetch=10
-03)----Projection: hits.UserID, count(Int64(1)) AS count(*), count(Int64(1))
-04)------Aggregate: groupBy=[[hits.UserID]], aggr=[[count(Int64(1))]]
-05)--------SubqueryAlias: hits
-06)----------TableScan: hits_raw projection=[UserID]
+01)Sort: count(*) AS count(*) DESC NULLS FIRST, fetch=10
+02)--Projection: hits.UserID, count(Int64(1)) AS count(*)
+03)----Aggregate: groupBy=[[hits.UserID]], aggr=[[count(Int64(1))]]
+04)------SubqueryAlias: hits
+05)--------TableScan: hits_raw projection=[UserID]
physical_plan
-01)ProjectionExec: expr=[UserID@0 as UserID, count(*)@1 as count(*)]
-02)--SortPreservingMergeExec: [count(Int64(1))@2 DESC], fetch=10
-03)----SortExec: TopK(fetch=10), expr=[count(*)@1 DESC],
preserve_partitioning=[true]
-04)------ProjectionExec: expr=[UserID@0 as UserID, count(Int64(1))@1 as
count(*), count(Int64(1))@1 as count(Int64(1))]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID],
aggr=[count(Int64(1))]
-06)----------RepartitionExec: partitioning=Hash([UserID@0], 4),
input_partitions=1
-07)------------AggregateExec: mode=Partial, gby=[UserID@0 as UserID],
aggr=[count(Int64(1))]
-08)--------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]},
projection=[UserID], file_type=parquet
+01)SortPreservingMergeExec: [count(*)@1 DESC], fetch=10
+02)--SortExec: TopK(fetch=10), expr=[count(*)@1 DESC],
preserve_partitioning=[true]
+03)----ProjectionExec: expr=[UserID@0 as UserID, count(Int64(1))@1 as count(*)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID],
aggr=[count(Int64(1))]
+05)--------RepartitionExec: partitioning=Hash([UserID@0], 4),
input_partitions=1
+06)----------AggregateExec: mode=Partial, gby=[UserID@0 as UserID],
aggr=[count(Int64(1))]
+07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]},
projection=[UserID], file_type=parquet
query II rowsort
SELECT "UserID", COUNT(*) FROM hits GROUP BY "UserID" ORDER BY COUNT(*) DESC
LIMIT 10;
@@ -463,21 +459,19 @@ query TT
EXPLAIN SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID",
"SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
----
logical_plan
-01)Projection: hits.UserID, hits.SearchPhrase, count(*)
-02)--Sort: count(Int64(1)) AS count(*) AS count(*) DESC NULLS FIRST, fetch=10
-03)----Projection: hits.UserID, hits.SearchPhrase, count(Int64(1)) AS
count(*), count(Int64(1))
-04)------Aggregate: groupBy=[[hits.UserID, hits.SearchPhrase]],
aggr=[[count(Int64(1))]]
-05)--------SubqueryAlias: hits
-06)----------TableScan: hits_raw projection=[UserID, SearchPhrase]
+01)Sort: count(*) AS count(*) DESC NULLS FIRST, fetch=10
+02)--Projection: hits.UserID, hits.SearchPhrase, count(Int64(1)) AS count(*)
+03)----Aggregate: groupBy=[[hits.UserID, hits.SearchPhrase]],
aggr=[[count(Int64(1))]]
+04)------SubqueryAlias: hits
+05)--------TableScan: hits_raw projection=[UserID, SearchPhrase]
physical_plan
-01)ProjectionExec: expr=[UserID@0 as UserID, SearchPhrase@1 as SearchPhrase,
count(*)@2 as count(*)]
-02)--SortPreservingMergeExec: [count(Int64(1))@3 DESC], fetch=10
-03)----SortExec: TopK(fetch=10), expr=[count(*)@2 DESC],
preserve_partitioning=[true]
-04)------ProjectionExec: expr=[UserID@0 as UserID, SearchPhrase@1 as
SearchPhrase, count(Int64(1))@2 as count(*), count(Int64(1))@2 as
count(Int64(1))]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID,
SearchPhrase@1 as SearchPhrase], aggr=[count(Int64(1))]
-06)----------RepartitionExec: partitioning=Hash([UserID@0, SearchPhrase@1],
4), input_partitions=1
-07)------------AggregateExec: mode=Partial, gby=[UserID@0 as UserID,
SearchPhrase@1 as SearchPhrase], aggr=[count(Int64(1))]
-08)--------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]},
projection=[UserID, SearchPhrase], file_type=parquet
+01)SortPreservingMergeExec: [count(*)@2 DESC], fetch=10
+02)--SortExec: TopK(fetch=10), expr=[count(*)@2 DESC],
preserve_partitioning=[true]
+03)----ProjectionExec: expr=[UserID@0 as UserID, SearchPhrase@1 as
SearchPhrase, count(Int64(1))@2 as count(*)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID,
SearchPhrase@1 as SearchPhrase], aggr=[count(Int64(1))]
+05)--------RepartitionExec: partitioning=Hash([UserID@0, SearchPhrase@1], 4),
input_partitions=1
+06)----------AggregateExec: mode=Partial, gby=[UserID@0 as UserID,
SearchPhrase@1 as SearchPhrase], aggr=[count(Int64(1))]
+07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]},
projection=[UserID, SearchPhrase], file_type=parquet
query ITI rowsort
SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID",
"SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
@@ -520,21 +514,19 @@ query TT
EXPLAIN SELECT "UserID", extract(minute FROM
to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits
GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
----
logical_plan
-01)Projection: hits.UserID, m, hits.SearchPhrase, count(*)
-02)--Sort: count(Int64(1)) AS count(*) AS count(*) DESC NULLS FIRST, fetch=10
-03)----Projection: hits.UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)) AS m,
hits.SearchPhrase, count(Int64(1)) AS count(*), count(Int64(1))
-04)------Aggregate: groupBy=[[hits.UserID, date_part(Utf8("MINUTE"),
to_timestamp_seconds(hits.EventTime)), hits.SearchPhrase]],
aggr=[[count(Int64(1))]]
-05)--------SubqueryAlias: hits
-06)----------TableScan: hits_raw projection=[EventTime, UserID, SearchPhrase]
+01)Sort: count(*) AS count(*) DESC NULLS FIRST, fetch=10
+02)--Projection: hits.UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)) AS m,
hits.SearchPhrase, count(Int64(1)) AS count(*)
+03)----Aggregate: groupBy=[[hits.UserID, date_part(Utf8("MINUTE"),
to_timestamp_seconds(hits.EventTime)), hits.SearchPhrase]],
aggr=[[count(Int64(1))]]
+04)------SubqueryAlias: hits
+05)--------TableScan: hits_raw projection=[EventTime, UserID, SearchPhrase]
physical_plan
-01)ProjectionExec: expr=[UserID@0 as UserID, m@1 as m, SearchPhrase@2 as
SearchPhrase, count(*)@3 as count(*)]
-02)--SortPreservingMergeExec: [count(Int64(1))@4 DESC], fetch=10
-03)----SortExec: TopK(fetch=10), expr=[count(*)@3 DESC],
preserve_partitioning=[true]
-04)------ProjectionExec: expr=[UserID@0 as UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as m,
SearchPhrase@2 as SearchPhrase, count(Int64(1))@3 as count(*),
count(Int64(1))@3 as count(Int64(1))]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2
as SearchPhrase], aggr=[count(Int64(1))]
-06)----------RepartitionExec: partitioning=Hash([UserID@0,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1,
SearchPhrase@2], 4), input_partitions=1
-07)------------AggregateExec: mode=Partial, gby=[UserID@1 as UserID,
date_part(MINUTE, to_timestamp_seconds(EventTime@0)) as
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2
as SearchPhrase], aggr=[count(Int64(1))]
-08)--------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]},
projection=[EventTime, UserID, SearchPhrase], file_type=parquet
+01)SortPreservingMergeExec: [count(*)@3 DESC], fetch=10
+02)--SortExec: TopK(fetch=10), expr=[count(*)@3 DESC],
preserve_partitioning=[true]
+03)----ProjectionExec: expr=[UserID@0 as UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as m,
SearchPhrase@2 as SearchPhrase, count(Int64(1))@3 as count(*)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2
as SearchPhrase], aggr=[count(Int64(1))]
+05)--------RepartitionExec: partitioning=Hash([UserID@0,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1,
SearchPhrase@2], 4), input_partitions=1
+06)----------AggregateExec: mode=Partial, gby=[UserID@1 as UserID,
date_part(MINUTE, to_timestamp_seconds(EventTime@0)) as
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2
as SearchPhrase], aggr=[count(Int64(1))]
+07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]},
projection=[EventTime, UserID, SearchPhrase], file_type=parquet
query IITI rowsort
SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m,
"SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER
BY COUNT(*) DESC LIMIT 10;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]