This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 07804384cb Projection Expression - Input Field Inconsistencies during
Projection (#10088)
07804384cb is described below
commit 07804384cbdcdd2861ec8a279632da32245e28f7
Author: Berkay Şahin <[email protected]>
AuthorDate: Mon Apr 22 22:35:48 2024 +0300
Projection Expression - Input Field Inconsistencies during Projection
(#10088)
* agg fixes
* test updates
* fixing count mismatch
* Update aggregate_statistics.rs
* catch different names
* minor
---
.../src/physical_optimizer/aggregate_statistics.rs | 9 +--
datafusion/core/src/physical_planner.rs | 31 +++++---
datafusion/functions-aggregate/src/first_last.rs | 41 ++++++++---
.../physical-expr/src/equivalence/projection.rs | 6 +-
.../test_files/agg_func_substitute.slt | 8 +--
datafusion/sqllogictest/test_files/aggregate.slt | 12 ++--
datafusion/sqllogictest/test_files/distinct_on.slt | 4 +-
datafusion/sqllogictest/test_files/group_by.slt | 82 +++++++++++-----------
datafusion/sqllogictest/test_files/joins.slt | 8 +--
datafusion/sqllogictest/test_files/subquery.slt | 26 +++++++
10 files changed, 145 insertions(+), 82 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index df54222270..98f8884e49 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -35,9 +35,6 @@ use
datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
#[derive(Default)]
pub struct AggregateStatistics {}
-/// The name of the column corresponding to [`COUNT_STAR_EXPANSION`]
-const COUNT_STAR_NAME: &str = "COUNT(*)";
-
impl AggregateStatistics {
#[allow(missing_docs)]
pub fn new() -> Self {
@@ -144,7 +141,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) ->
Option<Arc<dyn ExecutionPlan>>
fn take_optimizable_table_count(
agg_expr: &dyn AggregateExpr,
stats: &Statistics,
-) -> Option<(ScalarValue, &'static str)> {
+) -> Option<(ScalarValue, String)> {
if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
&stats.num_rows,
agg_expr.as_any().downcast_ref::<expressions::Count>(),
@@ -158,7 +155,7 @@ fn take_optimizable_table_count(
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
- COUNT_STAR_NAME,
+ casted_expr.name().to_owned(),
));
}
}
@@ -427,7 +424,7 @@ pub(crate) mod tests {
/// What name would this aggregate produce in a plan?
fn column_name(&self) -> &'static str {
match self {
- Self::CountStar => COUNT_STAR_NAME,
+ Self::CountStar => "COUNT(*)",
Self::ColumnA(_) => "COUNT(a)",
}
}
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 301f68c0f2..e6785b1dec 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -87,6 +87,7 @@ use datafusion_expr::expr::{
WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
+use datafusion_expr::expr_vec_fmt;
use
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery,
@@ -108,6 +109,7 @@ fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
+ order_by: Option<&Vec<Expr>>,
) -> Result<String> {
let names: Vec<String> = args
.iter()
@@ -118,7 +120,12 @@ fn create_function_physical_name(
true => "DISTINCT ",
false => "",
};
- Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
+
+ let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));
+
+ Ok(order_by
+ .map(|order_by| format!("{} ORDER BY [{}]", phys_name,
expr_vec_fmt!(order_by)))
+ .unwrap_or(phys_name))
}
fn physical_name(e: &Expr) -> Result<String> {
@@ -238,22 +245,30 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) ->
Result<String> {
return internal_err!("Function `Expr` with name should be
resolved.");
}
- create_function_physical_name(fun.name(), false, &fun.args)
+ create_function_physical_name(fun.name(), false, &fun.args, None)
}
- Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
- create_function_physical_name(&fun.to_string(), false, args)
+ Expr::WindowFunction(WindowFunction {
+ fun,
+ args,
+ order_by,
+ ..
+ }) => {
+ create_function_physical_name(&fun.to_string(), false, args,
Some(order_by))
}
Expr::AggregateFunction(AggregateFunction {
func_def,
distinct,
args,
filter,
- order_by: _,
+ order_by,
null_treatment: _,
}) => match func_def {
- AggregateFunctionDefinition::BuiltIn(..) => {
- create_function_physical_name(func_def.name(), *distinct, args)
- }
+ AggregateFunctionDefinition::BuiltIn(..) =>
create_function_physical_name(
+ func_def.name(),
+ *distinct,
+ args,
+ order_by.as_ref(),
+ ),
AggregateFunctionDefinition::UDF(fun) => {
// TODO: Add support for filter by in AggregateUDF
if filter.is_some() {
diff --git a/datafusion/functions-aggregate/src/first_last.rs
b/datafusion/functions-aggregate/src/first_last.rs
index f8b388a4f3..8dc4cee87a 100644
--- a/datafusion/functions-aggregate/src/first_last.rs
+++ b/datafusion/functions-aggregate/src/first_last.rs
@@ -415,11 +415,9 @@ impl FirstValuePhysicalExpr {
}
pub fn convert_to_last(self) -> LastValuePhysicalExpr {
- let name = if self.name.starts_with("FIRST") {
- format!("LAST{}", &self.name[5..])
- } else {
- format!("LAST_VALUE({})", self.expr)
- };
+ let mut name = format!("LAST{}", &self.name[5..]);
+ replace_order_by_clause(&mut name);
+
let FirstValuePhysicalExpr {
expr,
input_data_type,
@@ -593,11 +591,9 @@ impl LastValuePhysicalExpr {
}
pub fn convert_to_first(self) -> FirstValuePhysicalExpr {
- let name = if self.name.starts_with("LAST") {
- format!("FIRST{}", &self.name[4..])
- } else {
- format!("FIRST_VALUE({})", self.expr)
- };
+ let mut name = format!("FIRST{}", &self.name[4..]);
+ replace_order_by_clause(&mut name);
+
let LastValuePhysicalExpr {
expr,
input_data_type,
@@ -905,6 +901,31 @@ fn convert_to_sort_cols(
.collect::<Vec<_>>()
}
+fn replace_order_by_clause(order_by: &mut String) {
+ let suffixes = [
+ (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
+ (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
+ (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
+ (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
+ ];
+
+ if let Some(start) = order_by.find("ORDER BY [") {
+ if let Some(end) = order_by[start..].find(']') {
+ let order_by_start = start + 9;
+ let order_by_end = start + end;
+
+ let column_order = &order_by[order_by_start..=order_by_end];
+ for &(suffix, replacement) in &suffixes {
+ if column_order.ends_with(suffix) {
+ let new_order = column_order.replace(suffix, replacement);
+ order_by.replace_range(order_by_start..=order_by_end,
&new_order);
+ break;
+ }
+ }
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use arrow::array::Int64Array;
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs
b/datafusion/physical-expr/src/equivalence/projection.rs
index 8c747ab8a2..18e350b097 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::Result;
+use datafusion_common::{internal_err, Result};
use crate::expressions::Column;
use crate::PhysicalExpr;
@@ -67,6 +67,10 @@ impl ProjectionMapping {
// Conceptually, `source_expr` and `expression`
should be the same.
let idx = col.index();
let matching_input_field = input_schema.field(idx);
+ if col.name() != matching_input_field.name() {
+ return internal_err!("Input field name {} does
not match with the projection expression {}",
+ matching_input_field.name(),col.name())
+ }
let matching_input_column =
Column::new(matching_input_field.name(), idx);
Ok(Transformed::yes(Arc::new(matching_input_column)))
diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt
b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
index 811bfa864f..7beb20a521 100644
--- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt
+++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
@@ -44,11 +44,11 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a,
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
@@ -64,11 +64,11 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a,
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index 5e4fb10456..8b5b84e766 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -132,9 +132,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY
[agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
02)--TableScan: agg_order projection=[c1, c2, c3]
physical_plan
-01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
+01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY
[agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
+03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)
ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
04)------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]},
projection=[c1, c2, c3], has_header=true
@@ -3520,9 +3520,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1)
ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
02)--TableScan: convert_first_last_table projection=[c1, c3]
physical_plan
-01)AggregateExec: mode=Final, gby=[],
aggr=[FIRST_VALUE(convert_first_last_table.c1)]
+01)AggregateExec: mode=Final, gby=[],
aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY
[convert_first_last_table.c3 DESC NULLS FIRST]]
02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[],
aggr=[LAST_VALUE(convert_first_last_table.c1)]
+03)----AggregateExec: mode=Partial, gby=[],
aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY
[convert_first_last_table.c3 ASC NULLS LAST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]},
projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS
LAST]], has_header=true
@@ -3534,8 +3534,8 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1)
ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
02)--TableScan: convert_first_last_table projection=[c1, c2]
physical_plan
-01)AggregateExec: mode=Final, gby=[],
aggr=[LAST_VALUE(convert_first_last_table.c1)]
+01)AggregateExec: mode=Final, gby=[],
aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY
[convert_first_last_table.c2 ASC NULLS LAST]]
02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(convert_first_last_table.c1)]
+03)----AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY
[convert_first_last_table.c2 DESC NULLS FIRST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]},
projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]],
has_header=true
diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt
b/datafusion/sqllogictest/test_files/distinct_on.slt
index fc0df1d5f6..972c935cee 100644
--- a/datafusion/sqllogictest/test_files/distinct_on.slt
+++ b/datafusion/sqllogictest/test_files/distinct_on.slt
@@ -97,10 +97,10 @@ physical_plan
01)ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1
as c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC
NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as c2]
02)--SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
03)----SortExec: expr=[c1@0 ASC NULLS LAST]
-04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC
NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST],
FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS
LAST, aggregate_test_100.c3 ASC NULLS LAST]]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
+07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC
NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST],
FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS
LAST, aggregate_test_100.c3 ASC NULLS LAST]]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
09)----------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c2, c3], has_header=true
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index 7bef738337..e015f7b01d 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -2016,10 +2016,10 @@ physical_plan
01)SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
02)--SortExec: expr=[col0@0 ASC NULLS LAST]
03)----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY
[r.col0 ASC NULLS LAST]@3 as last_col1]
-04)------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS
LAST]]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4),
input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
+07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS
LAST]]
08)--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4
as col2, col0@0 as col0, col1@1 as col1]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(col0@0, col0@0)]
@@ -2145,7 +2145,7 @@ logical_plan
03)----TableScan: annotated_data_infinite2 projection=[a, c, d]
physical_plan
01)ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c)
ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as summation1]
-02)--AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a],
aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallySorted([1])
+02)--AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a],
aggr=[SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC
NULLS FIRST]], ordering_mode=PartiallySorted([1])
03)----StreamingTableExec: partition_sizes=1, projection=[a, c, d],
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]
query III
@@ -2178,7 +2178,7 @@ logical_plan
03)----TableScan: annotated_data_infinite2 projection=[a, b, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, b@1 as b,
FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a
DESC NULLS FIRST]@2 as first_c]
-02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[FIRST_VALUE(annotated_data_infinite2.c) ORDER BY
[annotated_data_infinite2.a DESC NULLS FIRST]], ordering_mode=Sorted
03)----StreamingTableExec: partition_sizes=1, projection=[a, b, c],
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST,
c@2 ASC NULLS LAST]
query III
@@ -2204,7 +2204,7 @@ logical_plan
03)----TableScan: annotated_data_infinite2 projection=[a, b, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, b@1 as b,
LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a
DESC NULLS FIRST]@2 as last_c]
-02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[LAST_VALUE(annotated_data_infinite2.c) ORDER BY
[annotated_data_infinite2.a DESC NULLS FIRST]], ordering_mode=Sorted
03)----StreamingTableExec: partition_sizes=1, projection=[a, b, c],
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST,
c@2 ASC NULLS LAST]
query III
@@ -2292,7 +2292,7 @@ logical_plan
01)Aggregate: groupBy=[[annotated_data_infinite2.a,
annotated_data_infinite2.b]], aggr=[[ARRAY_AGG(annotated_data_infinite2.d)
ORDER BY [annotated_data_infinite2.d ASC NULLS LAST]]]
02)--TableScan: annotated_data_infinite2 projection=[a, b, d]
physical_plan
-01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[ARRAY_AGG(annotated_data_infinite2.d)], ordering_mode=Sorted
+01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[ARRAY_AGG(annotated_data_infinite2.d) ORDER BY
[annotated_data_infinite2.d ASC NULLS LAST]], ordering_mode=Sorted
02)--PartialSortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@2 ASC
NULLS LAST], common_prefix_length=[2]
03)----StreamingTableExec: partition_sizes=1, projection=[a, b, d],
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]
@@ -2464,7 +2464,7 @@ logical_plan
03)----TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]]
03)----SortExec: expr=[amount@1 ASC NULLS LAST]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2494,7 +2494,7 @@ logical_plan
04)------TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY
[s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(s.amount), SUM(s.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]
03)----SortExec: expr=[amount@1 DESC]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2538,7 +2538,7 @@ logical_plan
05)--------TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY
[s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)],
ordering_mode=Sorted
03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2574,7 +2574,7 @@ logical_plan
05)--------TableScan: sales_global projection=[zip_code, country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code,
ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts,
SUM(s.amount)@3 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as
zip_code], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)],
ordering_mode=PartiallySorted([0])
+02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as
zip_code], aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST],
SUM(s.amount)], ordering_mode=PartiallySorted([0])
03)----SortExec: expr=[country@1 ASC NULLS LAST,amount@2 DESC]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2610,7 +2610,7 @@ logical_plan
05)--------TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY
[s.country DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST],
SUM(s.amount)], ordering_mode=Sorted
03)----SortExec: expr=[country@0 ASC NULLS LAST]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2645,7 +2645,7 @@ logical_plan
05)--------TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts,
SUM(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC
NULLS FIRST], SUM(s.amount)], ordering_mode=Sorted
03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2677,7 +2677,7 @@ logical_plan
03)----TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@3 as fv2]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC
NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]]
03)----SortExec: expr=[amount@1 DESC]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2708,7 +2708,7 @@ logical_plan
03)----TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@3 as fv2]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
FIRST_VALUE(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]]
03)----SortExec: expr=[amount@1 ASC NULLS LAST]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2740,7 +2740,7 @@ logical_plan
03)----TableScan: sales_global projection=[country, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY
[sales_global.amount ASC NULLS LAST]@3 as amounts]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount),
ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]]
03)----SortExec: expr=[amount@1 ASC NULLS LAST]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2770,7 +2770,7 @@ logical_plan
03)----TableScan: sales_global projection=[country, ts, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER
BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[SUM(sales_global.amount), ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST],
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]
03)----SortExec: expr=[amount@2 ASC NULLS LAST]
04)------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2805,7 +2805,7 @@ logical_plan
04)------TableScan: sales_global projection=[country, ts, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1
as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]@3 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount),
SUM(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]
03)----MemoryExec: partitions=1, partition_sizes=[1]
query TRRR rowsort
@@ -2838,7 +2838,7 @@ logical_plan
03)----TableScan: sales_global projection=[country, ts, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1
as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]@3 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount),
SUM(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]
03)----MemoryExec: partitions=1, partition_sizes=[1]
query TRRR rowsort
@@ -2874,7 +2874,7 @@ logical_plan
physical_plan
01)SortExec: expr=[sn@2 ASC NULLS LAST]
02)--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0
as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn
ASC NULLS LAST]@5 as last_rate]
-03)----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code,
country@1 as country, ts@3 as ts, currency@4 as currency],
aggr=[LAST_VALUE(e.amount)]
+03)----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code,
country@1 as country, ts@3 as ts, currency@4 as currency],
aggr=[LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]]
04)------ProjectionExec: expr=[zip_code@2 as zip_code, country@3 as country,
sn@4 as sn, ts@5 as ts, currency@6 as currency, sn@0 as sn, amount@1 as amount]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2,
currency@4)], filter=ts@0 >= ts@1, projection=[sn@0, amount@3, zip_code@4,
country@5, sn@6, ts@7, currency@8]
@@ -2919,11 +2919,11 @@ physical_plan
01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
02)--SortExec: expr=[country@0 ASC NULLS LAST]
03)----ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST]@2 as fv2]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST]]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
07)------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
-08)--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+08)--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST]]
09)----------------MemoryExec: partitions=1, partition_sizes=[1]
query TRR
@@ -2955,11 +2955,11 @@ physical_plan
01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
02)--SortExec: expr=[country@0 ASC NULLS LAST]
03)----ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]@2 as fv2]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
07)------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
-08)--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+08)--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]]
09)----------------MemoryExec: partitions=1, partition_sizes=[1]
query TRR
@@ -2991,9 +2991,9 @@ logical_plan
03)----TableScan: sales_global projection=[ts, amount]
physical_plan
01)ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY
[sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv2]
-02)--AggregateExec: mode=Final, gby=[],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts ASC NULLS LAST]]
03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST]]
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
06)----------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3017,9 +3017,9 @@ logical_plan
03)----TableScan: sales_global projection=[ts, amount]
physical_plan
01)ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY
[sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv2]
-02)--AggregateExec: mode=Final, gby=[],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts DESC NULLS FIRST]]
03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]]
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
06)----------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3041,9 +3041,9 @@ logical_plan
03)----TableScan: sales_global projection=[ts, amount]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY
[sales_global.ts ASC NULLS LAST]@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.ts ASC NULLS LAST]]
03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[],
aggr=[ARRAY_AGG(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]
05)--------SortExec: expr=[ts@0 ASC NULLS LAST]
06)----------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
07)------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3065,9 +3065,9 @@ logical_plan
03)----TableScan: sales_global projection=[ts, amount]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY
[sales_global.ts DESC NULLS FIRST]@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.ts DESC NULLS FIRST]]
03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[],
aggr=[ARRAY_AGG(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]]
05)--------SortExec: expr=[ts@0 DESC]
06)----------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
07)------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3089,9 +3089,9 @@ logical_plan
03)----TableScan: sales_global projection=[amount]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY
[sales_global.amount ASC NULLS LAST]@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]]
03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[],
aggr=[ARRAY_AGG(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]]
05)--------SortExec: expr=[amount@0 ASC NULLS LAST]
06)----------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
07)------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3118,10 +3118,10 @@ physical_plan
01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
02)--SortExec: expr=[country@0 ASC NULLS LAST]
03)----ProjectionExec: expr=[country@0 as country,
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1
as array_agg1]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]]
05)--------CoalesceBatchesExec: target_batch_size=4
06)----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
-07)------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount)]
+07)------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]]
08)--------------SortExec: expr=[amount@1 ASC NULLS LAST]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
10)------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3154,10 +3154,10 @@ physical_plan
01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
02)--SortExec: expr=[country@0 ASC NULLS LAST]
03)----ProjectionExec: expr=[country@0 as country,
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS
FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY
[sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount)
ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS
FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC
NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC
NULLS FIRST]]
05)--------CoalesceBatchesExec: target_batch_size=4
06)----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
-07)------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
+07)------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC
NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]]
08)--------------SortExec: expr=[amount@1 DESC]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
10)------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3800,10 +3800,10 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c, d]
physical_plan
01)ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a,
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC
NULLS FIRST]@2 as last_c]
-02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
LAST_VALUE(multiple_ordered_table.c)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a
ASC NULLS LAST], LAST_VALUE(multiple_ordered_table.c) ORDER BY
[multiple_ordered_table.c DESC NULLS FIRST]]
03)----CoalesceBatchesExec: target_batch_size=2
04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
-05)--------AggregateExec: mode=Partial, gby=[d@2 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
FIRST_VALUE(multiple_ordered_table.c)]
+05)--------AggregateExec: mode=Partial, gby=[d@2 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a
ASC NULLS LAST], FIRST_VALUE(multiple_ordered_table.c) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]]
06)----------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
07)------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c,
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]],
has_header=true
@@ -3870,7 +3870,7 @@ logical_plan
12)------------------TableScan: multiple_ordered_table projection=[a, d]
physical_plan
01)ProjectionExec: expr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]@1 as
amount_usd]
-02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n],
aggr=[LAST_VALUE(l.d)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[LAST_VALUE(l.d)
ORDER BY [l.a ASC NULLS LAST]], ordering_mode=Sorted
03)----CoalesceBatchesExec: target_batch_size=2
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)],
filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10, projection=[a@0, d@1,
row_n@4]
05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d],
output_ordering=[a@0 ASC NULLS LAST], has_header=true
@@ -4974,7 +4974,7 @@ logical_plan
02)--Aggregate: groupBy=[[multiple_ordered_table.a,
multiple_ordered_table.b]], aggr=[[ARRAY_AGG(multiple_ordered_table.c) ORDER BY
[multiple_ordered_table.c DESC NULLS FIRST]]]
03)----TableScan: multiple_ordered_table projection=[a, b, c]
physical_plan
-01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[ARRAY_AGG(multiple_ordered_table.c)], ordering_mode=Sorted
+01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[ARRAY_AGG(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c
DESC NULLS FIRST]], ordering_mode=Sorted
02)--CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS
LAST]], has_header=true
query II?
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 65e6c17b92..5ef33307b5 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3362,7 +3362,7 @@ logical_plan
08)----------TableScan: annotated_data projection=[a, b]
physical_plan
01)ProjectionExec: expr=[a@0 as a, LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS
FIRST]@3 as last_col1]
-02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b, c@2 as c],
aggr=[LAST_VALUE(r.b)], ordering_mode=PartiallySorted([0])
+02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b, c@2 as c],
aggr=[LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]],
ordering_mode=PartiallySorted([0])
03)----CoalesceBatchesExec: target_batch_size=2
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST],
has_header=true
@@ -3410,7 +3410,7 @@ logical_plan
12)------------------TableScan: multiple_ordered_table projection=[a, d]
physical_plan
01)ProjectionExec: expr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]@1 as
amount_usd]
-02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n],
aggr=[LAST_VALUE(l.d)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[LAST_VALUE(l.d)
ORDER BY [l.a ASC NULLS LAST]], ordering_mode=Sorted
03)----CoalesceBatchesExec: target_batch_size=2
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)],
filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10, projection=[a@0, d@1,
row_n@4]
05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d],
output_ordering=[a@0 ASC NULLS LAST], has_header=true
@@ -3447,10 +3447,10 @@ physical_plan
01)SortPreservingMergeExec: [a@0 ASC]
02)--SortExec: expr=[a@0 ASC]
03)----ProjectionExec: expr=[a@0 as a, LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS
FIRST]@3 as last_col1]
-04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as
c], aggr=[LAST_VALUE(r.b)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as
c], aggr=[LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]]
05)--------CoalesceBatchesExec: target_batch_size=2
06)----------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2),
input_partitions=2
-07)------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as
c], aggr=[LAST_VALUE(r.b)]
+07)------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as
c], aggr=[LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]]
08)--------------CoalesceBatchesExec: target_batch_size=2
09)----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0,
a@0)]
10)------------------CoalesceBatchesExec: target_batch_size=2
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index 155a176e85..7196418af1 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -728,6 +728,32 @@ logical_plan
07)------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]
08)--------TableScan: t2 projection=[]
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+query TT
+explain select (select count(*) from t1) as b, (select count(1) from t2)
+----
+logical_plan
+01)Projection: __scalar_sq_1.COUNT(*) AS b, __scalar_sq_2.COUNT(Int64(1)) AS
COUNT(Int64(1))
+02)--Left Join:
+03)----SubqueryAlias: __scalar_sq_1
+04)------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1)) AS COUNT(*)]]
+05)--------TableScan: t1 projection=[]
+06)----SubqueryAlias: __scalar_sq_2
+07)------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]
+08)--------TableScan: t2 projection=[]
+physical_plan
+01)ProjectionExec: expr=[COUNT(*)@0 as b, COUNT(Int64(1))@1 as COUNT(Int64(1))]
+02)--NestedLoopJoinExec: join_type=Left
+03)----ProjectionExec: expr=[4 as COUNT(*)]
+04)------PlaceholderRowExec
+05)----ProjectionExec: expr=[4 as COUNT(Int64(1))]
+06)------PlaceholderRowExec
+
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
query II
select (select count(*) from t1) as b, (select count(1) from t2)
----
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]