This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e848bf079 Fix bug, first last reverse (#7914)
9e848bf079 is described below

commit 9e848bf0790aea2323b936e9298af8bcc7e05c85
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Oct 24 13:58:01 2023 +0300

    Fix bug, first last reverse (#7914)
---
 .../physical-expr/src/aggregate/first_last.rs      |  8 ++++--
 datafusion/physical-expr/src/aggregate/utils.rs    | 12 +++-----
 datafusion/sqllogictest/test_files/groupby.slt     | 32 ++++++++++++++++++++++
 3 files changed, 41 insertions(+), 11 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs 
b/datafusion/physical-expr/src/aggregate/first_last.rs
index 6ae7b4895a..ce7a1daeec 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -22,7 +22,9 @@ use std::sync::Arc;
 
 use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
 use crate::expressions::format_state_name;
-use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use crate::{
+    reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, 
PhysicalSortExpr,
+};
 
 use arrow::array::ArrayRef;
 use arrow::compute;
@@ -126,7 +128,7 @@ impl AggregateExpr for FirstValue {
             self.expr.clone(),
             name,
             self.input_data_type.clone(),
-            self.ordering_req.clone(),
+            reverse_order_bys(&self.ordering_req),
             self.order_by_data_types.clone(),
         )))
     }
@@ -350,7 +352,7 @@ impl AggregateExpr for LastValue {
             self.expr.clone(),
             name,
             self.input_data_type.clone(),
-            self.ordering_req.clone(),
+            reverse_order_bys(&self.ordering_req),
             self.order_by_data_types.clone(),
         )))
     }
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs 
b/datafusion/physical-expr/src/aggregate/utils.rs
index 420b26eb2d..da3a527132 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -177,14 +177,10 @@ pub fn adjust_output_array(
 /// for [`AggregateExpr`] aggregation expressions and allows comparing the 
equality
 /// between the trait objects.
 pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
-    if any.is::<Arc<dyn AggregateExpr>>() {
-        any.downcast_ref::<Arc<dyn AggregateExpr>>()
-            .unwrap()
-            .as_any()
-    } else if any.is::<Box<dyn AggregateExpr>>() {
-        any.downcast_ref::<Box<dyn AggregateExpr>>()
-            .unwrap()
-            .as_any()
+    if let Some(obj) = any.downcast_ref::<Arc<dyn AggregateExpr>>() {
+        obj.as_any()
+    } else if let Some(obj) = any.downcast_ref::<Box<dyn AggregateExpr>>() {
+        obj.as_any()
     } else {
         any
     }
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index bf93c6633b..5cb3ac2f81 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -3613,6 +3613,38 @@ AggregateExec: mode=Final, gby=[], 
aggr=[FIRST_VALUE(foo.x)]
 ------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
 --------MemoryExec: partitions=1, partition_sizes=[1]
 
+# Since both ordering requirements are satisfied, there shouldn't be
+# any SortExec in the final plan.
+query TT
+EXPLAIN SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a,
+  LAST_VALUE(c ORDER BY c DESC) as last_c
+FROM multiple_ordered_table
+GROUP BY d;
+----
+logical_plan
+Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER BY 
[multiple_ordered_table.a ASC NULLS LAST] AS first_a, 
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC 
NULLS FIRST] AS last_c
+--Aggregate: groupBy=[[multiple_ordered_table.d]], 
aggr=[[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a 
ASC NULLS LAST], LAST_VALUE(multiple_ordered_table.c) ORDER BY 
[multiple_ordered_table.c DESC NULLS FIRST]]]
+----TableScan: multiple_ordered_table projection=[a, c, d]
+physical_plan
+ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY 
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, 
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC 
NULLS FIRST]@2 as last_c]
+--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
FIRST_VALUE(multiple_ordered_table.c)]
+----CoalesceBatchesExec: target_batch_size=2
+------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
+--------AggregateExec: mode=Partial, gby=[d@2 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
FIRST_VALUE(multiple_ordered_table.c)]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, 
d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+query II rowsort
+SELECT FIRST_VALUE(a ORDER BY a ASC) as first_a,
+  LAST_VALUE(c ORDER BY c DESC) as last_c
+FROM multiple_ordered_table
+GROUP BY d;
+----
+0 0
+0 1
+0 15
+0 4
+0 9
 
 query TT
 EXPLAIN SELECT c

Reply via email to