This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9deec2ad72 minor: implement with_new_expressions for
AggregateFunctionExpr (#16897)
9deec2ad72 is described below
commit 9deec2ad725813e266dc8c51ec75302fed7412f8
Author: Berkay Şahin <[email protected]>
AuthorDate: Sat Jul 26 20:15:48 2025 +0300
minor: implement with_new_expressions for AggregateFunctionExpr (#16897)
* minor
* Update aggregate.rs
---
datafusion/expr/src/window_state.rs | 10 +++---
datafusion/physical-expr/src/aggregate.rs | 38 ++++++++++++++++++++--
.../physical-expr/src/expressions/literal.rs | 2 +-
datafusion/physical-expr/src/window/aggregate.rs | 6 +++-
.../physical-expr/src/window/sliding_aggregate.rs | 6 +++-
datafusion/physical-expr/src/window/standard.rs | 4 +++
datafusion/physical-expr/src/window/window_expr.rs | 6 ++++
.../group_values/single_group_by/bytes.rs | 13 ++++----
datafusion/physical-plan/src/test.rs | 2 +-
9 files changed, 68 insertions(+), 19 deletions(-)
diff --git a/datafusion/expr/src/window_state.rs
b/datafusion/expr/src/window_state.rs
index a101b8fe4d..014bed5aea 100644
--- a/datafusion/expr/src/window_state.rs
+++ b/datafusion/expr/src/window_state.rs
@@ -34,7 +34,7 @@ use datafusion_common::{
};
/// Holds the state of evaluating a window function
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct WindowAggState {
/// The range that we calculate the window function
pub window_frame_range: Range<usize>,
@@ -112,7 +112,7 @@ impl WindowAggState {
}
/// This object stores the window frame state for use in incremental
calculations.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum WindowFrameContext {
/// ROWS frames are inherently stateless.
Rows(Arc<WindowFrame>),
@@ -240,7 +240,7 @@ impl WindowFrameContext {
}
/// State for each unique partition determined according to PARTITION BY
column(s)
-#[derive(Debug)]
+#[derive(Debug, Clone, PartialEq)]
pub struct PartitionBatchState {
/// The record batch belonging to current partition
pub record_batch: RecordBatch,
@@ -282,7 +282,7 @@ impl PartitionBatchState {
/// ranges of data while processing RANGE frames.
/// Attribute `sort_options` stores the column ordering specified by the ORDER
/// BY clause. This information is used to calculate the range.
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Clone)]
pub struct WindowFrameStateRange {
sort_options: Vec<SortOptions>,
}
@@ -454,7 +454,7 @@ impl WindowFrameStateRange {
/// This structure encapsulates all the state information we require as we
/// scan groups of data while processing window frames.
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Clone)]
pub struct WindowFrameStateGroups {
/// A tuple containing group values and the row index where the group ends.
/// Example: [[1, 1], [1, 1], [2, 1], [2, 1], ...] would correspond to
diff --git a/datafusion/physical-expr/src/aggregate.rs
b/datafusion/physical-expr/src/aggregate.rs
index 9175c01274..ed30481182 100644
--- a/datafusion/physical-expr/src/aggregate.rs
+++ b/datafusion/physical-expr/src/aggregate.rs
@@ -616,10 +616,42 @@ impl AggregateFunctionExpr {
/// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported,
otherwise returns `None`.
pub fn with_new_expressions(
&self,
- _args: Vec<Arc<dyn PhysicalExpr>>,
- _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
+ args: Vec<Arc<dyn PhysicalExpr>>,
+ order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Option<AggregateFunctionExpr> {
- None
+ if args.len() != self.args.len()
+ || (self.order_sensitivity() !=
AggregateOrderSensitivity::Insensitive
+ && order_by_exprs.len() != self.order_bys.len())
+ {
+ return None;
+ }
+
+ let new_order_bys = self
+ .order_bys
+ .iter()
+ .zip(order_by_exprs)
+ .map(|(req, new_expr)| PhysicalSortExpr {
+ expr: new_expr,
+ options: req.options,
+ })
+ .collect();
+
+ Some(AggregateFunctionExpr {
+ fun: self.fun.clone(),
+ args,
+ return_field: Arc::clone(&self.return_field),
+ name: self.name.clone(),
+ // TODO: Human name should be updated after re-write to not mislead
+ human_display: self.human_display.clone(),
+ schema: self.schema.clone(),
+ order_bys: new_order_bys,
+ ignore_nulls: self.ignore_nulls,
+ ordering_fields: self.ordering_fields.clone(),
+ is_distinct: self.is_distinct,
+ is_reversed: false,
+ input_fields: self.input_fields.clone(),
+ is_nullable: self.is_nullable,
+ })
}
/// If this function is max, return (output_field, true)
diff --git a/datafusion/physical-expr/src/expressions/literal.rs
b/datafusion/physical-expr/src/expressions/literal.rs
index 1a2ebf000f..6e425ee439 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -36,7 +36,7 @@ use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};
/// Represents a literal value
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Literal {
value: ScalarValue,
field: FieldRef,
diff --git a/datafusion/physical-expr/src/window/aggregate.rs
b/datafusion/physical-expr/src/window/aggregate.rs
index 6f0e7c963d..d7287c27de 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use crate::aggregate::AggregateFunctionExpr;
use crate::window::standard::add_new_ordering_expr_with_partition_by;
-use crate::window::window_expr::AggregateWindowExpr;
+use crate::window::window_expr::{AggregateWindowExpr, WindowFn};
use crate::window::{
PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr,
WindowExpr,
};
@@ -211,6 +211,10 @@ impl WindowExpr for PlainAggregateWindowExpr {
fn uses_bounded_memory(&self) -> bool {
!self.window_frame.end_bound.is_unbounded()
}
+
+ fn create_window_fn(&self) -> Result<WindowFn> {
+ Ok(WindowFn::Aggregate(self.get_accumulator()?))
+ }
}
impl AggregateWindowExpr for PlainAggregateWindowExpr {
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 33921a57a6..cb105e773d 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -22,7 +22,7 @@ use std::ops::Range;
use std::sync::Arc;
use crate::aggregate::AggregateFunctionExpr;
-use crate::window::window_expr::AggregateWindowExpr;
+use crate::window::window_expr::{AggregateWindowExpr, WindowFn};
use crate::window::{
PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr,
WindowExpr,
};
@@ -175,6 +175,10 @@ impl WindowExpr for SlidingAggregateWindowExpr {
window_frame: Arc::clone(&self.window_frame),
}))
}
+
+ fn create_window_fn(&self) -> Result<WindowFn> {
+ Ok(WindowFn::Aggregate(self.get_accumulator()?))
+ }
}
impl AggregateWindowExpr for SlidingAggregateWindowExpr {
diff --git a/datafusion/physical-expr/src/window/standard.rs
b/datafusion/physical-expr/src/window/standard.rs
index c3761aa78f..7b208ea41f 100644
--- a/datafusion/physical-expr/src/window/standard.rs
+++ b/datafusion/physical-expr/src/window/standard.rs
@@ -275,6 +275,10 @@ impl WindowExpr for StandardWindowExpr {
false
}
}
+
+ fn create_window_fn(&self) -> Result<WindowFn> {
+ Ok(WindowFn::Builtin(self.expr.create_evaluator()?))
+ }
}
/// Adds a new ordering expression into existing ordering equivalence
class(es) based on
diff --git a/datafusion/physical-expr/src/window/window_expr.rs
b/datafusion/physical-expr/src/window/window_expr.rs
index dd671e0685..ee39b5b245 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -130,6 +130,12 @@ pub trait WindowExpr: Send + Sync + Debug {
/// Get the reverse expression of this [WindowExpr].
fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>>;
+ /// Creates a new instance of the window function evaluator.
+ ///
+ /// Returns `WindowFn::Builtin` for built-in window functions (e.g.,
ROW_NUMBER, RANK)
+ /// or `WindowFn::Aggregate` for aggregate window functions (e.g., SUM,
AVG).
+ fn create_window_fn(&self) -> Result<WindowFn>;
+
/// Returns all expressions used in the [`WindowExpr`].
/// These expressions are (1) function arguments, (2) partition by
expressions, (3) order by expressions.
fn all_expressions(&self) -> WindowPhysicalExpressions {
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
index 9686b8c352..21078ceb8a 100644
---
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
+++
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
@@ -15,11 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use std::mem::size_of;
+
use crate::aggregates::group_values::GroupValues;
+
use arrow::array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
+use datafusion_common::Result;
use datafusion_expr::EmitTo;
use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
-use std::mem::size_of;
/// A [`GroupValues`] storing single column of
Utf8/LargeUtf8/Binary/LargeBinary values
///
@@ -42,11 +45,7 @@ impl<O: OffsetSizeTrait> GroupValuesByes<O> {
}
impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
- fn intern(
- &mut self,
- cols: &[ArrayRef],
- groups: &mut Vec<usize>,
- ) -> datafusion_common::Result<()> {
+ fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) ->
Result<()> {
assert_eq!(cols.len(), 1);
// look up / add entries in the table
@@ -85,7 +84,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
self.num_groups
}
- fn emit(&mut self, emit_to: EmitTo) ->
datafusion_common::Result<Vec<ArrayRef>> {
+ fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();
diff --git a/datafusion/physical-plan/src/test.rs
b/datafusion/physical-plan/src/test.rs
index be921e0581..349f9955b6 100644
--- a/datafusion/physical-plan/src/test.rs
+++ b/datafusion/physical-plan/src/test.rs
@@ -131,7 +131,7 @@ impl ExecutionPlan for TestMemoryExec {
}
fn as_any(&self) -> &dyn Any {
- unimplemented!()
+ self
}
fn properties(&self) -> &PlanProperties {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]