This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0c177d18dc Use shorter aliases in CSE (#10939)
0c177d18dc is described below
commit 0c177d18dc57fe81a7a23e1b1a41991dc88bb4f6
Author: Peter Toth <[email protected]>
AuthorDate: Tue Jun 18 12:25:28 2024 +0200
Use shorter aliases in CSE (#10939)
* initial change
* test renaming
* use counter instead of indexmap
* order slt tests
* change cse tests
* restore slt tests
* fix slt test
* formatting
* ensure no alias collision
* keep original alias numbers for collision
* ensure no collision in aggregate cse
* use `AliasGenerator` to generate aliases, use `__cse` prefix in common
expression aliases, remove `DataType` from `ExprStats` as not needed, store
aliases in `CommonExprs`, revert unnecessary changes
* use `into_values()` instead of `into_iter()` where possible
* fix docstring of `ExprStats` and `CommonExprs`
* use `__common_expr` prefix
---------
Co-authored-by: Mohamed Abdeen <[email protected]>
---
.../optimizer/src/common_subexpr_eliminate.rs | 368 +++++++++++----------
datafusion/sqllogictest/test_files/group_by.slt | 8 +-
datafusion/sqllogictest/test_files/select.slt | 16 +-
datafusion/sqllogictest/test_files/subquery.slt | 8 +-
.../sqllogictest/test_files/tpch/q1.slt.part | 6 +-
datafusion/sqllogictest/test_files/window.slt | 126 +++----
6 files changed, 271 insertions(+), 261 deletions(-)
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 3ed1309f15..e150a957bf 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -22,19 +22,19 @@ use std::sync::Arc;
use crate::{utils, OptimizerConfig, OptimizerRule};
-use arrow::datatypes::{DataType, Field};
+use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
TreeNodeRewriter,
TreeNodeVisitor,
};
-use datafusion_common::{
- internal_err, qualified_name, Column, DFSchema, DFSchemaRef,
DataFusionError, Result,
-};
+use datafusion_common::{qualified_name, Column, DFSchema, DataFusionError,
Result};
use datafusion_expr::expr::Alias;
use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection,
Window};
use datafusion_expr::{col, Expr, ExprSchemable};
use indexmap::IndexMap;
+const CSE_PREFIX: &str = "__common_expr";
+
/// Identifier that represents a subexpression tree.
///
/// Note that the current implementation contains:
@@ -79,16 +79,12 @@ type Identifier = String;
/// ```
type IdArray = Vec<(usize, Identifier)>;
-/// A map that contains statistics of expressions by their identifiers.
-/// It contains:
-/// - The number of occurrences and
-/// - The DataType
-/// of an expression.
-type ExprStats = HashMap<Identifier, (usize, DataType)>;
+/// A map that contains the number of occurrences of expressions by their
identifiers.
+type ExprStats = HashMap<Identifier, usize>;
-/// A map that contains the common expressions extracted during the second,
rewriting
-/// traversal.
-type CommonExprs = IndexMap<Identifier, Expr>;
+/// A map that contains the common expressions and their alias extracted
during the
+/// second, rewriting traversal.
+type CommonExprs = IndexMap<Identifier, (Expr, String)>;
/// Performs Common Sub-expression Elimination optimization.
///
@@ -131,6 +127,7 @@ impl CommonSubexprEliminate {
arrays_list: &[&[IdArray]],
expr_stats: &ExprStats,
common_exprs: &mut CommonExprs,
+ alias_generator: &AliasGenerator,
) -> Result<Vec<Vec<Expr>>> {
exprs_list
.iter()
@@ -141,7 +138,13 @@ impl CommonSubexprEliminate {
.cloned()
.zip(arrays.iter())
.map(|(expr, id_array)| {
- replace_common_expr(expr, id_array, expr_stats,
common_exprs)
+ replace_common_expr(
+ expr,
+ id_array,
+ expr_stats,
+ common_exprs,
+ alias_generator,
+ )
})
.collect::<Result<Vec<_>>>()
})
@@ -164,21 +167,21 @@ impl CommonSubexprEliminate {
expr_stats: &ExprStats,
config: &dyn OptimizerConfig,
) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
- let mut common_exprs = IndexMap::new();
-
+ let mut common_exprs = CommonExprs::new();
let rewrite_exprs = self.rewrite_exprs_list(
exprs_list,
arrays_list,
expr_stats,
&mut common_exprs,
+ &config.alias_generator(),
)?;
let mut new_input = self
.try_optimize(input, config)?
.unwrap_or_else(|| input.clone());
+
if !common_exprs.is_empty() {
- new_input =
- build_common_expr_project_plan(new_input, common_exprs,
expr_stats)?;
+ new_input = build_common_expr_project_plan(new_input,
common_exprs)?;
}
Ok((rewrite_exprs, new_input))
@@ -209,13 +212,7 @@ impl CommonSubexprEliminate {
} = window;
plan = input.as_ref().clone();
- let input_schema = Arc::clone(input.schema());
- let arrays = to_arrays(
- &window_expr,
- input_schema,
- &mut expr_stats,
- ExprMask::Normal,
- )?;
+ let arrays = to_arrays(&window_expr, &mut expr_stats,
ExprMask::Normal)?;
window_exprs.push(window_expr);
arrays_per_window.push(arrays);
@@ -277,15 +274,8 @@ impl CommonSubexprEliminate {
let mut expr_stats = ExprStats::new();
// rewrite inputs
- let input_schema = Arc::clone(input.schema());
- let group_arrays = to_arrays(
- group_expr,
- Arc::clone(&input_schema),
- &mut expr_stats,
- ExprMask::Normal,
- )?;
- let aggr_arrays =
- to_arrays(aggr_expr, input_schema, &mut expr_stats,
ExprMask::Normal)?;
+ let group_arrays = to_arrays(group_expr, &mut expr_stats,
ExprMask::Normal)?;
+ let aggr_arrays = to_arrays(aggr_expr, &mut expr_stats,
ExprMask::Normal)?;
let (mut new_expr, new_input) = self.rewrite_expr(
&[group_expr, aggr_expr],
@@ -303,16 +293,16 @@ impl CommonSubexprEliminate {
let new_input_schema = Arc::clone(new_input.schema());
let aggr_arrays = to_arrays(
&new_aggr_expr,
- new_input_schema.clone(),
&mut expr_stats,
ExprMask::NormalAndAggregates,
)?;
- let mut common_exprs = IndexMap::new();
+ let mut common_exprs = CommonExprs::new();
let mut rewritten = self.rewrite_exprs_list(
&[&new_aggr_expr],
&[&aggr_arrays],
&expr_stats,
&mut common_exprs,
+ &config.alias_generator(),
)?;
let rewritten = pop_expr(&mut rewritten)?;
@@ -330,11 +320,8 @@ impl CommonSubexprEliminate {
.map(LogicalPlan::Aggregate)
} else {
let mut agg_exprs = common_exprs
- .into_iter()
- .map(|(expr_id, expr)| {
- // todo: check `nullable`
- expr.alias(expr_id)
- })
+ .into_values()
+ .map(|(expr, expr_alias)| expr.alias(expr_alias))
.collect::<Vec<_>>();
let mut proj_exprs = vec![];
@@ -347,14 +334,15 @@ impl CommonSubexprEliminate {
agg_exprs.push(expr.alias(&name));
proj_exprs.push(Expr::Column(Column::from_name(name)));
} else {
- let id = expr_identifier(&expr_rewritten,
"".to_string());
+ let expr_alias =
config.alias_generator().next(CSE_PREFIX);
let (qualifier, field) =
expr_rewritten.to_field(&new_input_schema)?;
let out_name = qualified_name(qualifier.as_ref(),
field.name());
- agg_exprs.push(expr_rewritten.alias(&id));
- proj_exprs
-
.push(Expr::Column(Column::from_name(id)).alias(out_name));
+ agg_exprs.push(expr_rewritten.alias(&expr_alias));
+ proj_exprs.push(
+
Expr::Column(Column::from_name(expr_alias)).alias(out_name),
+ );
}
} else {
proj_exprs.push(expr_rewritten);
@@ -382,11 +370,10 @@ impl CommonSubexprEliminate {
let expr = plan.expressions();
let inputs = plan.inputs();
let input = inputs[0];
- let input_schema = Arc::clone(input.schema());
let mut expr_stats = ExprStats::new();
// Visit expr list and build expr identifier to occuring count map
(`expr_stats`).
- let arrays = to_arrays(&expr, input_schema, &mut expr_stats,
ExprMask::Normal)?;
+ let arrays = to_arrays(&expr, &mut expr_stats, ExprMask::Normal)?;
let (mut new_expr, new_input) =
self.rewrite_expr(&[&expr], &[&arrays], input, &expr_stats,
config)?;
@@ -477,20 +464,13 @@ fn pop_expr(new_expr: &mut Vec<Vec<Expr>>) ->
Result<Vec<Expr>> {
fn to_arrays(
expr: &[Expr],
- input_schema: DFSchemaRef,
expr_stats: &mut ExprStats,
expr_mask: ExprMask,
) -> Result<Vec<IdArray>> {
expr.iter()
.map(|e| {
let mut id_array = vec![];
- expr_to_identifier(
- e,
- expr_stats,
- &mut id_array,
- Arc::clone(&input_schema),
- expr_mask,
- )?;
+ expr_to_identifier(e, expr_stats, &mut id_array, expr_mask)?;
Ok(id_array)
})
@@ -510,19 +490,13 @@ fn to_arrays(
fn build_common_expr_project_plan(
input: LogicalPlan,
common_exprs: CommonExprs,
- expr_stats: &ExprStats,
) -> Result<LogicalPlan> {
let mut fields_set = BTreeSet::new();
let mut project_exprs = common_exprs
- .into_iter()
- .map(|(expr_id, expr)| {
- let Some((_, data_type)) = expr_stats.get(&expr_id) else {
- return internal_err!("expr_stats invalid state");
- };
- // todo: check `nullable`
- let field = Field::new(&expr_id, data_type.clone(), true);
- fields_set.insert(field.name().to_owned());
- Ok(expr.alias(expr_id))
+ .into_values()
+ .map(|(expr, expr_alias)| {
+ fields_set.insert(expr_alias.clone());
+ Ok(expr.alias(expr_alias))
})
.collect::<Result<Vec<_>>>()?;
@@ -637,9 +611,6 @@ struct ExprIdentifierVisitor<'a> {
expr_stats: &'a mut ExprStats,
// cache to speed up second traversal
id_array: &'a mut IdArray,
- // input schema for the node that we're optimizing, so we can determine
the correct datatype
- // for each subexpression
- input_schema: DFSchemaRef,
// inner states
visit_stack: Vec<VisitRecord>,
// preorder index, start from 0.
@@ -714,14 +685,7 @@ impl<'n> TreeNodeVisitor<'n> for ExprIdentifierVisitor<'_>
{
self.id_array[down_index].0 = self.up_index;
if !self.expr_mask.ignores(expr) {
self.id_array[down_index].1.clone_from(&expr_id);
-
- // TODO: can we capture the data type in the second traversal only
for
- // replaced expressions?
- let data_type = expr.get_type(&self.input_schema)?;
- let (count, _) = self
- .expr_stats
- .entry(expr_id.clone())
- .or_insert((0, data_type));
+ let count = self.expr_stats.entry(expr_id.clone()).or_insert(0);
*count += 1;
}
self.visit_stack.push(VisitRecord::ExprItem(expr_id));
@@ -740,13 +704,11 @@ fn expr_to_identifier(
expr: &Expr,
expr_stats: &mut ExprStats,
id_array: &mut IdArray,
- input_schema: DFSchemaRef,
expr_mask: ExprMask,
) -> Result<()> {
expr.visit(&mut ExprIdentifierVisitor {
expr_stats,
id_array,
- input_schema,
visit_stack: vec![],
down_index: 0,
up_index: 0,
@@ -771,6 +733,8 @@ struct CommonSubexprRewriter<'a> {
down_index: usize,
// how many aliases have we seen so far
alias_counter: usize,
+ // alias generator for extracted common expressions
+ alias_generator: &'a AliasGenerator,
}
impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
@@ -780,17 +744,18 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
if matches!(expr, Expr::Alias(_)) {
self.alias_counter -= 1
}
+
Ok(Transformed::no(expr))
}
fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
- // The `CommonSubexprRewriter` relies on `ExprIdentifierVisitor` to
generate
- // the `id_array`, which records the expr's identifier used to rewrite
expr. So if we
- // skip an expr in `ExprIdentifierVisitor`, we should skip it here,
too.
if matches!(expr, Expr::Alias(_)) {
self.alias_counter += 1;
}
+ // The `CommonSubexprRewriter` relies on `ExprIdentifierVisitor` to
generate
+ // the `id_array`, which records the expr's identifier used to rewrite
expr. So if we
+ // skip an expr in `ExprIdentifierVisitor`, we should skip it here,
too.
if expr.short_circuits() || expr.is_volatile()? {
return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
}
@@ -803,8 +768,8 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
return Ok(Transformed::no(expr));
}
- let (counter, _) = self.expr_stats.get(expr_id).unwrap();
- if *counter > 1 {
+ let count = self.expr_stats.get(expr_id).unwrap();
+ if *count > 1 {
// step index to skip all sub-node (which has smaller series
number).
while self.down_index < self.id_array.len()
&& self.id_array[self.down_index].0 < *up_index
@@ -813,14 +778,18 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
}
let expr_name = expr.display_name()?;
- self.common_exprs.insert(expr_id.clone(), expr);
+ let (_, expr_alias) =
+ self.common_exprs.entry(expr_id.clone()).or_insert_with(|| {
+ let expr_alias = self.alias_generator.next(CSE_PREFIX);
+ (expr, expr_alias)
+ });
// alias the expressions without an `Alias` ancestor node
let rewritten = if self.alias_counter > 0 {
- col(expr_id)
+ col(expr_alias.clone())
} else {
self.alias_counter += 1;
- col(expr_id).alias(expr_name)
+ col(expr_alias.clone()).alias(expr_name)
};
Ok(Transformed::new(rewritten, true, TreeNodeRecursion::Jump))
@@ -837,6 +806,7 @@ fn replace_common_expr(
id_array: &IdArray,
expr_stats: &ExprStats,
common_exprs: &mut CommonExprs,
+ alias_generator: &AliasGenerator,
) -> Result<Expr> {
expr.rewrite(&mut CommonSubexprRewriter {
expr_stats,
@@ -844,6 +814,7 @@ fn replace_common_expr(
common_exprs,
down_index: 0,
alias_counter: 0,
+ alias_generator,
})
.data()
}
@@ -852,7 +823,7 @@ fn replace_common_expr(
mod test {
use std::iter;
- use arrow::datatypes::Schema;
+ use arrow::datatypes::{DataType, Field, Schema};
use datafusion_expr::logical_plan::{table_scan, JoinType};
@@ -868,10 +839,16 @@ mod test {
use super::*;
- fn assert_optimized_plan_eq(expected: &str, plan: &LogicalPlan) {
+ fn assert_optimized_plan_eq(
+ expected: &str,
+ plan: &LogicalPlan,
+ config: Option<&dyn OptimizerConfig>,
+ ) {
let optimizer = CommonSubexprEliminate {};
+ let default_config = OptimizerContext::new();
+ let config = config.unwrap_or(&default_config);
let optimized_plan = optimizer
- .try_optimize(plan, &OptimizerContext::new())
+ .try_optimize(plan, config)
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{optimized_plan:?}");
@@ -882,24 +859,9 @@ mod test {
fn id_array_visitor() -> Result<()> {
let expr = ((sum(col("a") + lit(1))) - avg(col("c"))) * lit(2);
- let schema = Arc::new(DFSchema::from_unqualifed_fields(
- vec![
- Field::new("a", DataType::Int64, false),
- Field::new("c", DataType::Int64, false),
- ]
- .into(),
- Default::default(),
- )?);
-
// skip aggregates
let mut id_array = vec![];
- expr_to_identifier(
- &expr,
- &mut HashMap::new(),
- &mut id_array,
- Arc::clone(&schema),
- ExprMask::Normal,
- )?;
+ expr_to_identifier(&expr, &mut HashMap::new(), &mut id_array,
ExprMask::Normal)?;
let expected = vec![
(8, "{(sum(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{sum(a +
Int32(1)) - AVG(c)|{AVG(c)|{c}}|{sum(a + Int32(1))|{a +
Int32(1)|{Int32(1)}|{a}}}}}"),
@@ -923,7 +885,6 @@ mod test {
&expr,
&mut HashMap::new(),
&mut id_array,
- Arc::clone(&schema),
ExprMask::NormalAndAggregates,
)?;
@@ -968,11 +929,11 @@ mod test {
)?
.build()?;
- let expected = "Aggregate: groupBy=[[]], aggr=[[sum({test.a *
(Int32(1) - test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a
* Int32(1) - test.b), sum({test.a * (Int32(1) - test.b)|{Int32(1) -
test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a * Int32(1) - test.b * (Int32(1)
+ test.c))]]\
- \n Projection: test.a * (Int32(1) - test.b) AS {test.a * (Int32(1) -
test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}}, test.a, test.b,
test.c\
+ let expected = "Aggregate: groupBy=[[]], aggr=[[sum(__common_expr_1 AS
test.a * Int32(1) - test.b), sum(__common_expr_1 AS test.a * Int32(1) - test.b
* (Int32(1) + test.c))]]\
+ \n Projection: test.a * (Int32(1) - test.b) AS __common_expr_1,
test.a, test.b, test.c\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
Ok(())
}
@@ -988,11 +949,11 @@ mod test {
])?
.build()?;
- let expected = "Projection: {test.a + test.b|{test.b}|{test.a}} -
test.c AS alias1 * {test.a + test.b|{test.b}|{test.a}} AS test.a + test.b,
{test.a + test.b|{test.b}|{test.a}} AS test.a + test.b\
- \n Projection: test.a + test.b AS {test.a +
test.b|{test.b}|{test.a}}, test.a, test.b, test.c\
+ let expected = "Projection: __common_expr_1 - test.c AS alias1 *
__common_expr_1 AS test.a + test.b, __common_expr_1 AS test.a + test.b\
+ \n Projection: test.a + test.b AS __common_expr_1, test.a, test.b,
test.c\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
Ok(())
}
@@ -1041,11 +1002,11 @@ mod test {
)?
.build()?;
- let expected = "Projection: {AVG(test.a)|{test.a}} AS col1,
{AVG(test.a)|{test.a}} AS col2, col3, {AVG(test.c)} AS AVG(test.c),
{my_agg(test.a)|{test.a}} AS col4, {my_agg(test.a)|{test.a}} AS col5, col6,
{my_agg(test.c)} AS my_agg(test.c)\
- \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}},
AVG(test.b) AS col3, AVG(test.c) AS {AVG(test.c)}, my_agg(test.b) AS col6,
my_agg(test.c) AS {my_agg(test.c)}]]\
+ let expected = "Projection: __common_expr_1 AS col1, __common_expr_1
AS col2, col3, __common_expr_3 AS AVG(test.c), __common_expr_2 AS col4,
__common_expr_2 AS col5, col6, __common_expr_4 AS my_agg(test.c)\
+ \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS __common_expr_1,
my_agg(test.a) AS __common_expr_2, AVG(test.b) AS col3, AVG(test.c) AS
__common_expr_3, my_agg(test.b) AS col6, my_agg(test.c) AS __common_expr_4]]\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
// test: trafo after aggregate
let plan = LogicalPlanBuilder::from(table_scan.clone())
@@ -1060,11 +1021,11 @@ mod test {
)?
.build()?;
- let expected = "Projection: Int32(1) + {AVG(test.a)|{test.a}} AS
AVG(test.a), Int32(1) - {AVG(test.a)|{test.a}} AS AVG(test.a), Int32(1) +
{my_agg(test.a)|{test.a}} AS my_agg(test.a), Int32(1) -
{my_agg(test.a)|{test.a}} AS my_agg(test.a)\
- \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}}]]\
+ let expected = "Projection: Int32(1) + __common_expr_1 AS AVG(test.a),
Int32(1) - __common_expr_1 AS AVG(test.a), Int32(1) + __common_expr_2 AS
my_agg(test.a), Int32(1) - __common_expr_2 AS my_agg(test.a)\
+ \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS __common_expr_1,
my_agg(test.a) AS __common_expr_2]]\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
// test: transformation before aggregate
let plan = LogicalPlanBuilder::from(table_scan.clone())
@@ -1077,9 +1038,11 @@ mod test {
)?
.build()?;
- let expected = "Aggregate: groupBy=[[]], aggr=[[AVG({UInt32(1) +
test.a|{test.a}|{UInt32(1)}}) AS col1, my_agg({UInt32(1) +
test.a|{test.a}|{UInt32(1)}}) AS col2]]\n Projection: UInt32(1) + test.a AS
{UInt32(1) + test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\n
TableScan: test";
+ let expected ="Aggregate: groupBy=[[]], aggr=[[AVG(__common_expr_1) AS
col1, my_agg(__common_expr_1) AS col2]]\
+ \n Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b,
test.c\
+ \n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
// test: common between agg and group
let plan = LogicalPlanBuilder::from(table_scan.clone())
@@ -1092,11 +1055,11 @@ mod test {
)?
.build()?;
- let expected = "Aggregate: groupBy=[[{UInt32(1) +
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) +
test.a|{test.a}|{UInt32(1)}}) AS col1, my_agg({UInt32(1) +
test.a|{test.a}|{UInt32(1)}}) AS col2]]\
- \n Projection: UInt32(1) + test.a AS {UInt32(1) +
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
+ let expected = "Aggregate: groupBy=[[__common_expr_1 AS UInt32(1) +
test.a]], aggr=[[AVG(__common_expr_1) AS col1, my_agg(__common_expr_1) AS
col2]]\
+ \n Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b,
test.c\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
// test: all mixed
let plan = LogicalPlanBuilder::from(table_scan)
@@ -1113,18 +1076,18 @@ mod test {
)?
.build()?;
- let expected = "Projection: UInt32(1) + test.a, UInt32(1) +
{AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) +
test.a|{test.a}|{UInt32(1)}}}} AS col1, UInt32(1) - {AVG({UInt32(1) +
test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}} AS
col2, {AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a)} AS
AVG(UInt32(1) + test.a), UInt32(1) + {my_agg({UInt32(1) +
test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}} A
[...]
- \n Aggregate: groupBy=[[{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS
UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}}) AS
{AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) +
test.a|{test.a}|{UInt32(1)}}}}, my_agg({UInt32(1) +
test.a|{test.a}|{UInt32(1)}}) AS {my_agg({UInt32(1) +
test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}},
AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS
{AVG({UInt32(1) + t [...]
- \n Projection: UInt32(1) + test.a AS {UInt32(1) +
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
+ let expected = "Projection: UInt32(1) + test.a, UInt32(1) +
__common_expr_2 AS col1, UInt32(1) - __common_expr_2 AS col2, __common_expr_4
AS AVG(UInt32(1) + test.a), UInt32(1) + __common_expr_3 AS col3, UInt32(1) -
__common_expr_3 AS col4, __common_expr_5 AS my_agg(UInt32(1) + test.a)\
+ \n Aggregate: groupBy=[[__common_expr_1 AS UInt32(1) + test.a]],
aggr=[[AVG(__common_expr_1) AS __common_expr_2, my_agg(__common_expr_1) AS
__common_expr_3, AVG(__common_expr_1 AS UInt32(1) + test.a) AS __common_expr_4,
my_agg(__common_expr_1 AS UInt32(1) + test.a) AS __common_expr_5]]\
+ \n Projection: UInt32(1) + test.a AS __common_expr_1, test.a,
test.b, test.c\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
Ok(())
}
#[test]
- fn aggregate_with_releations_and_dots() -> Result<()> {
+ fn aggregate_with_relations_and_dots() -> Result<()> {
let schema = Schema::new(vec![Field::new("col.a", DataType::UInt32,
false)]);
let table_scan = table_scan(Some("table.test"), &schema,
None)?.build()?;
@@ -1140,12 +1103,12 @@ mod test {
)?
.build()?;
- let expected = "Projection: table.test.col.a, UInt32(1) +
{AVG({UInt32(1) + table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1)
+ table.test.col.a)|{{UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a|{{UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}} AS AVG(UInt32(1) +
table.test.col.a), {AVG({UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a)|{{UInt32(1) + table.test.co [...]
- \n Aggregate: groupBy=[[table.test.col.a]], aggr=[[AVG({UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a) AS {AVG({UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a)|{{UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a|{{UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}}]]\
- \n Projection: UInt32(1) + table.test.col.a AS {UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}}, table.test.col.a\
+ let expected = "Projection: table.test.col.a, UInt32(1) +
__common_expr_2 AS AVG(UInt32(1) + table.test.col.a), __common_expr_2 AS
AVG(UInt32(1) + table.test.col.a)\
+ \n Aggregate: groupBy=[[table.test.col.a]],
aggr=[[AVG(__common_expr_1 AS UInt32(1) + table.test.col.a) AS
__common_expr_2]]\
+ \n Projection: UInt32(1) + table.test.col.a AS __common_expr_1,
table.test.col.a\
\n TableScan: table.test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
Ok(())
}
@@ -1161,11 +1124,11 @@ mod test {
])?
.build()?;
- let expected = "Projection: {Int32(1) + test.a|{test.a}|{Int32(1)}} AS
first, {Int32(1) + test.a|{test.a}|{Int32(1)}} AS second\
- \n Projection: Int32(1) + test.a AS {Int32(1) +
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
+ let expected = "Projection: __common_expr_1 AS first, __common_expr_1
AS second\
+ \n Projection: Int32(1) + test.a AS __common_expr_1, test.a, test.b,
test.c\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
Ok(())
}
@@ -1181,7 +1144,7 @@ mod test {
let expected = "Projection: Int32(1) + test.a, test.a + Int32(1)\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
Ok(())
}
@@ -1199,35 +1162,35 @@ mod test {
\n Projection: Int32(1) + test.a, test.a\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
Ok(())
}
#[test]
fn redundant_project_fields() {
let table_scan = test_table_scan().unwrap();
- let expr_stats_1 = ExprStats::from([
- ("c+a".to_string(), (1, DataType::UInt32)),
- ("b+a".to_string(), (1, DataType::UInt32)),
- ]);
- let common_exprs_1 = IndexMap::from([
- ("c+a".to_string(), col("c") + col("a")),
- ("b+a".to_string(), col("b") + col("a")),
- ]);
- let exprs_stats_2 = ExprStats::from([
- ("c+a".to_string(), (1, DataType::UInt32)),
- ("b+a".to_string(), (1, DataType::UInt32)),
+ let common_exprs_1 = CommonExprs::from([
+ (
+ "c+a".to_string(),
+ (col("c") + col("a"), format!("{CSE_PREFIX}_1")),
+ ),
+ (
+ "b+a".to_string(),
+ (col("b") + col("a"), format!("{CSE_PREFIX}_2")),
+ ),
]);
- let common_exprs_2 = IndexMap::from([
- ("c+a".to_string(), col("c+a")),
- ("b+a".to_string(), col("b+a")),
+ let common_exprs_2 = CommonExprs::from([
+ (
+ "c+a".to_string(),
+ (col(format!("{CSE_PREFIX}_1")), format!("{CSE_PREFIX}_3")),
+ ),
+ (
+ "b+a".to_string(),
+ (col(format!("{CSE_PREFIX}_2")), format!("{CSE_PREFIX}_4")),
+ ),
]);
- let project =
- build_common_expr_project_plan(table_scan, common_exprs_1,
&expr_stats_1)
- .unwrap();
- let project_2 =
- build_common_expr_project_plan(project, common_exprs_2,
&exprs_stats_2)
- .unwrap();
+ let project = build_common_expr_project_plan(table_scan,
common_exprs_1).unwrap();
+ let project_2 = build_common_expr_project_plan(project,
common_exprs_2).unwrap();
let mut field_set = BTreeSet::new();
for name in project_2.schema().field_names() {
@@ -1244,33 +1207,28 @@ mod test {
.unwrap()
.build()
.unwrap();
- let expr_stats_1 = ExprStats::from([
- ("test1.c+test1.a".to_string(), (1, DataType::UInt32)),
- ("test1.b+test1.a".to_string(), (1, DataType::UInt32)),
- ]);
- let common_exprs_1 = IndexMap::from([
+ let common_exprs_1 = CommonExprs::from([
(
"test1.c+test1.a".to_string(),
- col("test1.c") + col("test1.a"),
+ (col("test1.c") + col("test1.a"), format!("{CSE_PREFIX}_1")),
),
(
"test1.b+test1.a".to_string(),
- col("test1.b") + col("test1.a"),
+ (col("test1.b") + col("test1.a"), format!("{CSE_PREFIX}_2")),
),
]);
- let expr_stats_2 = ExprStats::from([
- ("test1.c+test1.a".to_string(), (1, DataType::UInt32)),
- ("test1.b+test1.a".to_string(), (1, DataType::UInt32)),
- ]);
- let common_exprs_2 = IndexMap::from([
- ("test1.c+test1.a".to_string(), col("test1.c+test1.a")),
- ("test1.b+test1.a".to_string(), col("test1.b+test1.a")),
+ let common_exprs_2 = CommonExprs::from([
+ (
+ "test1.c+test1.a".to_string(),
+ (col(format!("{CSE_PREFIX}_1")), format!("{CSE_PREFIX}_3")),
+ ),
+ (
+ "test1.b+test1.a".to_string(),
+ (col(format!("{CSE_PREFIX}_2")), format!("{CSE_PREFIX}_4")),
+ ),
]);
- let project =
- build_common_expr_project_plan(join, common_exprs_1,
&expr_stats_1).unwrap();
- let project_2 =
- build_common_expr_project_plan(project, common_exprs_2,
&expr_stats_2)
- .unwrap();
+ let project = build_common_expr_project_plan(join,
common_exprs_1).unwrap();
+ let project_2 = build_common_expr_project_plan(project,
common_exprs_2).unwrap();
let mut field_set = BTreeSet::new();
for name in project_2.schema().field_names() {
@@ -1337,11 +1295,11 @@ mod test {
.build()?;
let expected = "Projection: test.a, test.b, test.c\
- \n Filter: {Int32(1) + test.a|{test.a}|{Int32(1)}} - Int32(10) >
{Int32(1) + test.a|{test.a}|{Int32(1)}}\
- \n Projection: Int32(1) + test.a AS {Int32(1) +
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
+ \n Filter: __common_expr_1 - Int32(10) > __common_expr_1\
+ \n Projection: Int32(1) + test.a AS __common_expr_1, test.a,
test.b, test.c\
\n TableScan: test";
- assert_optimized_plan_eq(expected, &plan);
+ assert_optimized_plan_eq(expected, &plan, None);
Ok(())
}
@@ -1383,6 +1341,58 @@ mod test {
Ok(())
}
+ #[test]
+ fn test_alias_collision() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let config = &OptimizerContext::new();
+ let common_expr_1 = config.alias_generator().next(CSE_PREFIX);
+ let plan = LogicalPlanBuilder::from(table_scan.clone())
+ .project(vec![
+ (col("a") + col("b")).alias(common_expr_1.clone()),
+ col("c"),
+ ])?
+ .project(vec![
+ col(common_expr_1.clone()).alias("c1"),
+ col(common_expr_1).alias("c2"),
+ (col("c") + lit(2)).alias("c3"),
+ (col("c") + lit(2)).alias("c4"),
+ ])?
+ .build()?;
+
+ let expected = "Projection: __common_expr_1 AS c1, __common_expr_1 AS
c2, __common_expr_2 AS c3, __common_expr_2 AS c4\
+ \n Projection: test.c + Int32(2) AS __common_expr_2, __common_expr_1,
test.c\
+ \n Projection: test.a + test.b AS __common_expr_1, test.c\
+ \n TableScan: test";
+
+ assert_optimized_plan_eq(expected, &plan, Some(config));
+
+ let config = &OptimizerContext::new();
+ let _common_expr_1 = config.alias_generator().next(CSE_PREFIX);
+ let common_expr_2 = config.alias_generator().next(CSE_PREFIX);
+ let plan = LogicalPlanBuilder::from(table_scan.clone())
+ .project(vec![
+ (col("a") + col("b")).alias(common_expr_2.clone()),
+ col("c"),
+ ])?
+ .project(vec![
+ col(common_expr_2.clone()).alias("c1"),
+ col(common_expr_2).alias("c2"),
+ (col("c") + lit(2)).alias("c3"),
+ (col("c") + lit(2)).alias("c4"),
+ ])?
+ .build()?;
+
+ let expected = "Projection: __common_expr_2 AS c1, __common_expr_2 AS
c2, __common_expr_3 AS c3, __common_expr_3 AS c4\
+ \n Projection: test.c + Int32(2) AS __common_expr_3, __common_expr_2,
test.c\
+ \n Projection: test.a + test.b AS __common_expr_2, test.c\
+ \n TableScan: test";
+
+ assert_optimized_plan_eq(expected, &plan, Some(config));
+
+ Ok(())
+ }
+
#[test]
fn test_extract_expressions_from_col() -> Result<()> {
let mut result = Vec::with_capacity(1);
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index 9e8a2450e0..8ccf3ae853 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -4187,8 +4187,8 @@ EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)),
MAX(DISTINCT CAST(x AS DOUBLE))
logical_plan
01)Projection: sum(alias1) AS sum(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT
t1.x)
02)--Aggregate: groupBy=[[t1.y]], aggr=[[sum(alias1), MAX(alias1)]]
-03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x AS
alias1]], aggr=[[]]
-04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}},
t1.y
+03)----Aggregate: groupBy=[[t1.y, __common_expr_1 AS t1.x AS alias1]],
aggr=[[]]
+04)------Projection: CAST(t1.x AS Float64) AS __common_expr_1, t1.y
05)--------TableScan: t1 projection=[x, y]
physical_plan
01)ProjectionExec: expr=[sum(alias1)@1 as sum(DISTINCT t1.x), MAX(alias1)@2 as
MAX(DISTINCT t1.x)]
@@ -4200,8 +4200,8 @@ physical_plan
07)------------CoalesceBatchesExec: target_batch_size=2
08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8),
input_partitions=8
09)----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
-10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, {CAST(t1.x AS
Float64)|{t1.x}}@0 as alias1], aggr=[]
-11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as
{CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
+10)------------------AggregateExec: mode=Partial, gby=[y@1 as y,
__common_expr_1@0 as alias1], aggr=[]
+11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as
__common_expr_1, y@1 as y]
12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
# create an unbounded table that contains ordered timestamp.
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index aae35c1ce7..c8ef2b7f5e 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1436,12 +1436,12 @@ query TT
EXPLAIN SELECT x/2, x/2+1 FROM t;
----
logical_plan
-01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x /
Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
-02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
+01)Projection: __common_expr_1 AS t.x / Int64(2), __common_expr_1 AS t.x /
Int64(2) + Int64(1)
+02)--Projection: t.x / Int64(2) AS __common_expr_1
03)----TableScan: t projection=[x]
physical_plan
-01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x /
Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
-02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
+01)ProjectionExec: expr=[__common_expr_1@0 as t.x / Int64(2),
__common_expr_1@0 + 1 as t.x / Int64(2) + Int64(1)]
+02)--ProjectionExec: expr=[x@0 / 2 as __common_expr_1]
03)----MemoryExec: partitions=1, partition_sizes=[1]
query II
@@ -1454,12 +1454,12 @@ query TT
EXPLAIN SELECT abs(x), abs(x) + abs(y) FROM t;
----
logical_plan
-01)Projection: {abs(t.x)|{t.x}} AS abs(t.x), {abs(t.x)|{t.x}} AS abs(t.x) +
abs(t.y)
-02)--Projection: abs(t.x) AS {abs(t.x)|{t.x}}, t.y
+01)Projection: __common_expr_1 AS abs(t.x), __common_expr_1 AS abs(t.x) +
abs(t.y)
+02)--Projection: abs(t.x) AS __common_expr_1, t.y
03)----TableScan: t projection=[x, y]
physical_plan
-01)ProjectionExec: expr=[{abs(t.x)|{t.x}}@0 as abs(t.x), {abs(t.x)|{t.x}}@0 +
abs(y@1) as abs(t.x) + abs(t.y)]
-02)--ProjectionExec: expr=[abs(x@0) as {abs(t.x)|{t.x}}, y@1 as y]
+01)ProjectionExec: expr=[__common_expr_1@0 as abs(t.x), __common_expr_1@0 +
abs(y@1) as abs(t.x) + abs(t.y)]
+02)--ProjectionExec: expr=[abs(x@0) as __common_expr_1, y@1 as y]
03)----MemoryExec: partitions=1, partition_sizes=[1]
query II
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index eb0904b230..f325d55676 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -1082,8 +1082,8 @@ query TT
explain select a/2, a/2 + 1 from t
----
logical_plan
-01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a /
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
+01)Projection: __common_expr_1 AS t.a / Int64(2), __common_expr_1 AS t.a /
Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS __common_expr_1
03)----TableScan: t projection=[a]
statement ok
@@ -1093,8 +1093,8 @@ query TT
explain select a/2, a/2 + 1 from t
----
logical_plan
-01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a /
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
+01)Projection: __common_expr_1 AS t.a / Int64(2), __common_expr_1 AS t.a /
Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS __common_expr_1
03)----TableScan: t projection=[a]
###
diff --git a/datafusion/sqllogictest/test_files/tpch/q1.slt.part
b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
index 5e0930b992..5a21bdf276 100644
--- a/datafusion/sqllogictest/test_files/tpch/q1.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
@@ -42,8 +42,8 @@ explain select
logical_plan
01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS
LAST
02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus,
sum(lineitem.l_quantity) AS sum_qty, sum(lineitem.l_extendedprice) AS
sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
AS sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge,
AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS
avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(*) AS count_order
-03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]],
aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice),
sum({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)|{Decimal128(Some(1),20,0) -
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}})
AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount),
sum({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discou
[...]
-04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount) AS {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)|{Decimal128(Some(1),20,0) -
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount,
lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
+03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]],
aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice),
sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) +
lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity),
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(Int64(1)) AS
[...]
+04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount) AS __common_expr_1, lineitem.l_quantity,
lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax,
lineitem.l_returnflag, lineitem.l_linestatus
05)--------Filter: lineitem.l_shipdate <= Date32("1998-09-02")
06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate],
partial_filters=[lineitem.l_shipdate <= Date32("1998-09-02")]
physical_plan
@@ -54,7 +54,7 @@ physical_plan
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([l_returnflag@0,
l_linestatus@1], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as
l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity),
sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity),
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
-08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 -
l_discount@2) as {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)|{Decimal128(Some(1),20,0) -
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5
as l_linestatus]
+08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 -
l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1
as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax,
l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------FilterExec: l_shipdate@6 <= 1998-09-02
11)--------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_quantity, l_extendedprice, l_discount, l_tax, l [...]
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 99f92b65c3..77b839f3f7 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1718,20 +1718,20 @@ EXPLAIN SELECT c3,
logical_plan
01)Projection: aggregate_test_100.c3, sum(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1,
sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY
[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING [...]
-04)------Projection: {aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}},
aggregate_test_100.c3, aggregate_test_100.c9, sum(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-05)--------WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY
[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DE [...]
-06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS
{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}},
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
+03)----WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY
[__common_expr_1 AS aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS
LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+04)------Projection: __common_expr_1, aggregate_test_100.c3,
aggregate_test_100.c9, sum(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+05)--------WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY
[__common_expr_1 AS aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS
FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS
sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PR [...]
+06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS
__common_expr_1, aggregate_test_100.c2, aggregate_test_100.c3,
aggregate_test_100.c9
07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
physical_plan
01)ProjectionExec: expr=[c3@1 as c3, sum(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1,
sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
02)--GlobalLimitExec: skip=0, fetch=5
03)----WindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW: Ok(Field { name: "sum(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, [...]
-04)------ProjectionExec: expr=[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as
{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as
c3, c9@3 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@4 as sum(a [...]
+04)------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, c3@2 as
c3, c9@3 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@4 as sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 AS [...]
05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRE [...]
-06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0
DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
-07)------------SortExec: expr=[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0
DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
-08)--------------ProjectionExec: expr=[c3@1 + c4@2 as {aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as
c2, c3@1 as c3, c9@3 as c9]
+06)----------SortPreservingMergeExec: [__common_expr_1@0 DESC,c9@3 DESC,c2@1
ASC NULLS LAST]
+07)------------SortExec: expr=[__common_expr_1@0 DESC,c9@3 DESC,c2@1 ASC NULLS
LAST], preserve_partitioning=[true]
+08)--------------ProjectionExec: expr=[c3@1 + c4@2 as __common_expr_1, c2@0 as
c2, c3@1 as c3, c9@3 as c9]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
10)------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2,
c3, c4, c9], has_header=true
@@ -2574,11 +2574,11 @@ logical_plan
02)--Limit: skip=0, fetch=5
03)----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
04)------Projection: sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING AS sum1, sum(annotated_data_finite.desc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1
FOLLOWING AS sum2, sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10
FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_dat
[...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite.desc_col
AS Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col)
ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) ROWS BETWEEN 8
PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
-06)----------Projection: {CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}, annotated_data_finite.inc_col,
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING,
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING,
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RO [...]
-07)------------WindowAggr:
windowExpr=[[sum({CAST(annotated_data_finite.inc_col AS
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, sum({CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING
AND 1 FOLLOWING, sum({CAST(annotated_data_f [...]
-08)--------------WindowAggr:
windowExpr=[[sum({CAST(annotated_data_finite.inc_col AS
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING, sum({CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING
AND 8 FOLLOWING, sum({CAST(annotated_d [...]
-09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64)
AS {CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}, CAST(annotated_data_finite.inc_col AS
Int64) AS {CAST(annotated_data_finite.inc_col AS
Int64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts,
annotated_data_finite.inc_col, annotated_data_finite.desc_col
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING,
COUNT(Int64(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS
BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+06)----------Projection: __common_expr_1, annotated_data_finite.inc_col,
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING,
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING,
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING,
MIN(annotated_data_finite [...]
+07)------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, sum(__common_expr_2 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(an [...]
+08)--------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, [...]
+09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64)
AS __common_expr_1, CAST(annotated_data_finite.inc_col AS Int64) AS
__common_expr_2, annotated_data_finite.ts, annotated_data_finite.inc_col,
annotated_data_finite.desc_col
10)------------------TableScan: annotated_data_finite projection=[ts, inc_col,
desc_col]
physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3,
min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2,
max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as
sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as
minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as
cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
@@ -2586,10 +2586,10 @@ physical_plan
03)----SortExec: TopK(fetch=5), expr=[inc_col@24 DESC],
preserve_partitioning=[false]
04)------ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING@13 as sum1, sum(annotated_data_finite.desc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1
FOLLOWING@14 as sum2, sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDE [...]
05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.desc_col) ROWS
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)),
end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) ROWS BETWEEN 8
PRECEDING AND 1 FOLLOWING: Ok(Field { name: "C [...]
-06)----------ProjectionExec: expr=[{CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}@0 as
{CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}, inc_col@3 as inc_col,
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@5 as
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, s [...]
+06)----------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1,
inc_col@3 as inc_col, sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING@5 as sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING, sum(annotated_data_finite.desc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8
FOLLOWING@ [...]
07)------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(10)), end_bound: Fol [...]
08)--------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING
AND 4 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(4)), end_bound: [...]
-09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as
{CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}, CAST(inc_col@1 AS Int64) as
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}},
ts@0 as ts, inc_col@1 as inc_col, desc_col@2 as desc_col]
+09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as
__common_expr_1, CAST(inc_col@1 AS Int64) as __common_expr_2, ts@0 as ts,
inc_col@1 as inc_col, desc_col@2 as desc_col]
10)------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts,
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
query IIIIIIIIIIIIIIIIIIIIIIII
@@ -2738,9 +2738,9 @@ logical_plan
02)--Limit: skip=0, fetch=5
03)----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
04)------Projection: sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING AS sum1, sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) [...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite.inc_col AS
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN [...]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite.inc_col
AS Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE [...]
-07)------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}},
CAST(annotated_data_finite.inc_col AS Float64) AS
{CAST(annotated_data_finite.inc_col AS
Float64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts,
annotated_data_finite.inc_col
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING,
MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING,
MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING,
COUNT(annotated_data_finit [...]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS
FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING,
MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING,
MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING,
COUNT(annotated_da [...]
+07)------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS
__common_expr_1, CAST(annotated_data_finite.inc_col AS Float64) AS
__common_expr_2, annotated_data_finite.ts, annotated_data_finite.inc_col
08)--------------TableScan: annotated_data_finite projection=[ts, inc_col]
physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1,
min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as
count2, avg1@8 as avg1, avg2@9 as avg2]
@@ -2749,7 +2749,7 @@ physical_plan
04)------ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING@9 as sum1, sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@4 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING@10 as min1, MIN(annotated_dat [...]
05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND 5 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(NULL)), en [...]
06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL [...]
-07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}},
CAST(inc_col@1 AS Float64) as {CAST(annotated_data_finite.inc_col AS
Float64)|{annotated_data_finite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
+07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
__common_expr_1, CAST(inc_col@1 AS Float64) as __common_expr_2, ts@0 as ts,
inc_col@1 as inc_col]
08)--------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts,
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
query IIIIIIIIRR
@@ -2839,9 +2839,9 @@ logical_plan
02)--Limit: skip=0, fetch=5
03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
04)------Projection: sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING AS sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite.inc_col
AS Int64)|{annotated_data_infinite.inc_col}} AS
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING,
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
-06)----------WindowAggr:
windowExpr=[[sum({CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
{CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts,
annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING,
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC
NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING,
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts
DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
__common_expr_1, annotated_data_infinite.ts, annotated_data_infinite.inc_col
08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1,
count2@3 as count2]
@@ -2849,7 +2849,7 @@ physical_plan
03)----ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
{CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
07)------------StreamingTableExec: partition_sizes=1, projection=[ts,
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
query IIII
@@ -2886,9 +2886,9 @@ logical_plan
02)--Limit: skip=0, fetch=5
03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
04)------Projection: sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING AS sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite.inc_col
AS Int64)|{annotated_data_infinite.inc_col}} AS
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING,
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
-06)----------WindowAggr:
windowExpr=[[sum({CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
{CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts,
annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING,
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC
NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING,
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts
DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
__common_expr_1, annotated_data_infinite.ts, annotated_data_infinite.inc_col
08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1,
count2@3 as count2]
@@ -2896,7 +2896,7 @@ physical_plan
03)----ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
{CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
07)------------StreamingTableExec: partition_sizes=1, projection=[ts,
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
@@ -2983,13 +2983,13 @@ EXPLAIN SELECT a, b, c,
logical_plan
01)Projection: annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, sum(annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1,
sum(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NU [...]
02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
sum({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS
annotated_data_infinite2.c) PARTITION BY [annotat [...]
-04)------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
sum({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotat
[...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER [...]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
sum({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_i [...]
-07)------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
sum({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, a [...]
-08)--------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] [...]
-09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}},
annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
+03)----WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC
NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING
AND 1 FOLLOWING, sum(__common_expr_1 AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST, annotated_data_infinite2.b ASC NULLS LAS [...]
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b,
annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum(__common_expr_1 AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN [...]
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b,
annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b,
annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum(__common_expr_1 AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infini [...]
+07)------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum(__common_expr_1 AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS B [...]
+08)--------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS
__common_expr_1, annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
10)------------------TableScan: annotated_data_infinite2 projection=[a, b, c,
d]
physical_plan
01)ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c,
sum(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING@9 as sum1, sum(annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC N
[...]
@@ -3000,7 +3000,7 @@ physical_plan
06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS
LAST, annotated_data_infinite2.c ASC NULLS LA [...]
07)------------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING [...]
08)--------------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, d [...]
-09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}}, a@0
as a, b@1 as b, c@2 as c, d@3 as d]
+09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as
__common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
10)------------------StreamingTableExec: partition_sizes=1, projection=[a, b,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST]
query IIIIIIIIIIIIIII
@@ -3052,13 +3052,13 @@ logical_plan
01)Limit: skip=0, fetch=5
02)--Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
03)----Projection: annotated_data_finite2.a, annotated_data_finite2.b,
annotated_data_finite2.c, sum(annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1,
sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST,
annotated_ [...]
-04)------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST,
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
sum({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] [...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND
1 FOLLOWING, sum({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a, a [...]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_
[...]
-07)------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
sum({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotate [...]
-08)--------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND
1 FOLLOWING, sum({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite [...]
-09)----------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated [...]
-10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}},
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c,
annotated_data_finite2.d
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS
LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum(__common_expr_1 AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST,
annotated_data_finite2.b ASC NULLS LAST, annotated_dat [...]
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b,
annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum(__common_expr_1 AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND
1 [...]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b,
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST]
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b,
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST]
ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+07)------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST,
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum(__common_expr_1 AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LA
[...]
+08)--------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, sum(__common_expr_1 AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING
[...]
+09)----------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST]
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST]
ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS
__common_expr_1, annotated_data_finite2.a, annotated_data_finite2.b,
annotated_data_finite2.c, annotated_data_finite2.d
11)--------------------TableScan: annotated_data_finite2 projection=[a, b, c,
d]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
@@ -3075,7 +3075,7 @@ physical_plan
12)----------------------BoundedWindowAggExec:
wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING: Ok(Field { name: "sum(annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND
1 FOL [...]
13)------------------------SortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC NULLS
LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST], preserve_partitioning=[false]
14)--------------------------BoundedWindowAggExec:
wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST]
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST]
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true,
dict_ [...]
-15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}}, a@0 as a,
b@1 as b, c@2 as c, d@3 as d]
+15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as
__common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
16)------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS
LAST], has_header=true
query IIIIIIIIIIIIIII
@@ -3241,21 +3241,21 @@ FROM annotated_data_infinite2;
----
logical_plan
01)Projection: sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum1, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum2, sum(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-03)----Projection: {CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a,
annotated_data_infinite2.d, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2. [...]
-04)------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}},
annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+03)----Projection: __common_expr_1, annotated_data_infinite2.a,
annotated_data_infinite2.d, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND C [...]
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b,
annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS
__common_expr_1, annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
physical_plan
01)ProjectionExec: expr=[sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as sum1, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@5 as sum2, sum(annotated_data_infinite2.a) PAR [...]
02)--BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d]
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }) [...]
-03)----ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d,
sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as
sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotat [...]
+03)----ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, a@1 as a,
d@4 as d, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@5 as sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CU [...]
04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nullabl [...]
05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nulla [...]
06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nul [...]
-07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0
as a, b@1 as b, c@2 as c, d@3 as d]
+07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as __common_expr_1,
a@0 as a, b@1 as b, c@2 as c, d@3 as d]
08)--------------StreamingTableExec: partition_sizes=1, projection=[a, b, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST]
statement ok
@@ -3272,29 +3272,29 @@ FROM annotated_data_infinite2;
----
logical_plan
01)Projection: sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum1, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum2, sum(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-03)----Projection: {CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a,
annotated_data_infinite2.d, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2. [...]
-04)------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}},
annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+03)----Projection: __common_expr_1, annotated_data_infinite2.a,
annotated_data_infinite2.d, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND C [...]
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b,
annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS
__common_expr_1, annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
physical_plan
01)ProjectionExec: expr=[sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as sum1, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@5 as sum2, sum(annotated_data_infinite2.a) PAR [...]
02)--BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d]
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }) [...]
03)----CoalesceBatchesExec: target_batch_size=4096
-04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2,
preserve_order=true, sort_exprs={CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST,a@1 ASC NULLS LAST
-05)--------ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d,
sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as
sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, ann
[...]
+04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2,
preserve_order=true, sort_exprs=__common_expr_1@0 ASC NULLS LAST,a@1 ASC NULLS
LAST
+05)--------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, a@1 as
a, d@4 as d, sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@5 as sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AN [...]
06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nul [...]
07)------------CoalesceBatchesExec: target_batch_size=4096
-08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2),
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
+08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2),
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC
NULLS LAST,c@3 ASC NULLS LAST,__common_expr_1@0 ASC NULLS LAST
09)----------------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int6 [...]
10)------------------CoalesceBatchesExec: target_batch_size=4096
-11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2),
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
+11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2),
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC
NULLS LAST,c@3 ASC NULLS LAST,__common_expr_1@0 ASC NULLS LAST
12)----------------------BoundedWindowAggExec:
wdw=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type [...]
13)------------------------CoalesceBatchesExec: target_batch_size=4096
-14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2],
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2
ASC NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
-15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0
as a, b@1 as b, c@2 as c, d@3 as d]
+14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2],
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2
ASC NULLS LAST,c@3 ASC NULLS LAST,__common_expr_1@0 ASC NULLS LAST
+15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as
__common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
16)------------------------------RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1
17)--------------------------------StreamingTableExec: partition_sizes=1,
projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]