This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9e848bf079 Fix bug, first last reverse (#7914)
9e848bf079 is described below
commit 9e848bf0790aea2323b936e9298af8bcc7e05c85
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Oct 24 13:58:01 2023 +0300
Fix bug, first last reverse (#7914)
---
.../physical-expr/src/aggregate/first_last.rs | 8 ++++--
datafusion/physical-expr/src/aggregate/utils.rs | 12 +++-----
datafusion/sqllogictest/test_files/groupby.slt | 32 ++++++++++++++++++++++
3 files changed, 41 insertions(+), 11 deletions(-)
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs
b/datafusion/physical-expr/src/aggregate/first_last.rs
index 6ae7b4895a..ce7a1daeec 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -22,7 +22,9 @@ use std::sync::Arc;
use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::expressions::format_state_name;
-use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use crate::{
+ reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr,
PhysicalSortExpr,
+};
use arrow::array::ArrayRef;
use arrow::compute;
@@ -126,7 +128,7 @@ impl AggregateExpr for FirstValue {
self.expr.clone(),
name,
self.input_data_type.clone(),
- self.ordering_req.clone(),
+ reverse_order_bys(&self.ordering_req),
self.order_by_data_types.clone(),
)))
}
@@ -350,7 +352,7 @@ impl AggregateExpr for LastValue {
self.expr.clone(),
name,
self.input_data_type.clone(),
- self.ordering_req.clone(),
+ reverse_order_bys(&self.ordering_req),
self.order_by_data_types.clone(),
)))
}
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs
b/datafusion/physical-expr/src/aggregate/utils.rs
index 420b26eb2d..da3a527132 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -177,14 +177,10 @@ pub fn adjust_output_array(
/// for [`AggregateExpr`] aggregation expressions and allows comparing the
equality
/// between the trait objects.
pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
- if any.is::<Arc<dyn AggregateExpr>>() {
- any.downcast_ref::<Arc<dyn AggregateExpr>>()
- .unwrap()
- .as_any()
- } else if any.is::<Box<dyn AggregateExpr>>() {
- any.downcast_ref::<Box<dyn AggregateExpr>>()
- .unwrap()
- .as_any()
+ if let Some(obj) = any.downcast_ref::<Arc<dyn AggregateExpr>>() {
+ obj.as_any()
+ } else if let Some(obj) = any.downcast_ref::<Box<dyn AggregateExpr>>() {
+ obj.as_any()
} else {
any
}
diff --git a/datafusion/sqllogictest/test_files/groupby.slt
b/datafusion/sqllogictest/test_files/groupby.slt
index bf93c6633b..5cb3ac2f81 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -3613,6 +3613,38 @@ AggregateExec: mode=Final, gby=[],
aggr=[FIRST_VALUE(foo.x)]
------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
--------MemoryExec: partitions=1, partition_sizes=[1]
+# Since both ordering requirements are satisfied, there shouldn't be
+# any SortExec in the final plan.
+query TT
+EXPLAIN SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a,
+ LAST_VALUE(c ORDER BY c DESC) as last_c
+FROM multiple_ordered_table
+GROUP BY d;
+----
+logical_plan
+Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER BY
[multiple_ordered_table.a ASC NULLS LAST] AS first_a,
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC
NULLS FIRST] AS last_c
+--Aggregate: groupBy=[[multiple_ordered_table.d]],
aggr=[[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a
ASC NULLS LAST], LAST_VALUE(multiple_ordered_table.c) ORDER BY
[multiple_ordered_table.c DESC NULLS FIRST]]]
+----TableScan: multiple_ordered_table projection=[a, c, d]
+physical_plan
+ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a,
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC
NULLS FIRST]@2 as last_c]
+--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
FIRST_VALUE(multiple_ordered_table.c)]
+----CoalesceBatchesExec: target_batch_size=2
+------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
+--------AggregateExec: mode=Partial, gby=[d@2 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
FIRST_VALUE(multiple_ordered_table.c)]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c,
d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+query II rowsort
+SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a,
+ LAST_VALUE(c ORDER BY c DESC) as last_c
+FROM multiple_ordered_table
+GROUP BY d;
+----
+0 0
+0 1
+0 15
+0 4
+0 9
query TT
EXPLAIN SELECT c