jayzhan211 commented on code in PR #10648:
URL: https://github.com/apache/datafusion/pull/10648#discussion_r1613370248


##########
datafusion/functions-aggregate/src/first_last.rs:
##########
@@ -161,6 +144,23 @@ impl AggregateUDFImpl for FirstValue {
     fn aliases(&self) -> &[String] {
         &self.aliases
     }
+
+    fn with_requirement_satisfied(
+        self: Arc<Self>,
+        requirement_satisfied: bool,
+    ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
+        Ok(Some(Arc::new(
+            
FirstValue::new().with_requirement_satisfied(requirement_satisfied),
+        )))
+    }
+
+    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+        AggregateOrderSensitivity::Beneficial
+    }
+
+    fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
+        datafusion_expr::ReversedUDAF::Reversed(Arc::new(LastValue::new()))

Review Comment:
   I think we can use `last_value_udaf` to avoid the cost of re-instantiate



##########
datafusion/functions-aggregate/src/first_last.rs:
##########
@@ -338,355 +338,131 @@ impl Accumulator for FirstValueAccumulator {
     }
 }
 
-/// TO BE DEPRECATED: Builtin FIRST_VALUE physical aggregate expression will 
be replaced by udf in the future
-#[derive(Debug, Clone)]
-pub struct FirstValuePhysicalExpr {
-    name: String,
-    input_data_type: DataType,
-    order_by_data_types: Vec<DataType>,
-    expr: Arc<dyn PhysicalExpr>,
-    ordering_req: LexOrdering,
+make_udaf_expr_and_func!(
+    LastValue,
+    last_value,
+    "Returns the last value in a group of values.",
+    last_value_udaf
+);
+
+pub struct LastValue {
+    signature: Signature,
+    aliases: Vec<String>,
     requirement_satisfied: bool,
-    ignore_nulls: bool,
-    state_fields: Vec<Field>,
 }
 
-impl FirstValuePhysicalExpr {
-    /// Creates a new FIRST_VALUE aggregation function.
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        input_data_type: DataType,
-        ordering_req: LexOrdering,
-        order_by_data_types: Vec<DataType>,
-        state_fields: Vec<Field>,
-    ) -> Self {
-        let requirement_satisfied = ordering_req.is_empty();
-        Self {
-            name: name.into(),
-            input_data_type,
-            order_by_data_types,
-            expr,
-            ordering_req,
-            requirement_satisfied,
-            ignore_nulls: false,
-            state_fields,
-        }
-    }
-
-    pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
-        self.ignore_nulls = ignore_nulls;
-        self
-    }
-
-    /// Returns the name of the aggregate expression.
-    pub fn name(&self) -> &str {
-        &self.name
-    }
-
-    /// Returns the input data type of the aggregate expression.
-    pub fn input_data_type(&self) -> &DataType {
-        &self.input_data_type
-    }
-
-    /// Returns the data types of the order-by columns.
-    pub fn order_by_data_types(&self) -> &Vec<DataType> {
-        &self.order_by_data_types
+impl Debug for LastValue {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("LastValue")
+            .field("name", &self.name())
+            .field("signature", &self.signature)
+            .field("accumulator", &"<FUNC>")
+            .finish()
     }
+}
 
-    /// Returns the expression associated with the aggregate function.
-    pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
-        &self.expr
+impl Default for LastValue {
+    fn default() -> Self {
+        Self::new()
     }
+}
 
-    /// Returns the lexical ordering requirements of the aggregate expression.
-    pub fn ordering_req(&self) -> &LexOrdering {
-        &self.ordering_req
+impl LastValue {
+    pub fn new() -> Self {
+        Self {
+            aliases: vec![String::from("LAST_VALUE"), 
String::from("last_value")],
+            signature: Signature::one_of(
+                vec![
+                    // TODO: we can introduce more strict signature that only 
numeric of array types are allowed
+                    
TypeSignature::ArraySignature(ArrayFunctionSignature::Array),
+                    TypeSignature::Uniform(1, NUMERICS.to_vec()),
+                ],
+                Volatility::Immutable,
+            ),
+            requirement_satisfied: false,
+        }
     }
 
-    pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) 
-> Self {
+    fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> 
Self {
         self.requirement_satisfied = requirement_satisfied;
         self
     }
-
-    pub fn convert_to_last(self) -> LastValuePhysicalExpr {
-        let mut name = format!("LAST{}", &self.name[5..]);
-        replace_order_by_clause(&mut name);
-
-        let FirstValuePhysicalExpr {
-            expr,
-            input_data_type,
-            ordering_req,
-            order_by_data_types,
-            ..
-        } = self;
-        LastValuePhysicalExpr::new(
-            expr,
-            name,
-            input_data_type,
-            reverse_order_bys(&ordering_req),
-            order_by_data_types,
-        )
-    }
 }
 
-impl AggregateExpr for FirstValuePhysicalExpr {
-    /// Return a reference to Any that can be used for downcasting
+impl AggregateUDFImpl for LastValue {
     fn as_any(&self) -> &dyn Any {
         self
     }
 
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, self.input_data_type.clone(), true))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        FirstValueAccumulator::try_new(
-            &self.input_data_type,
-            &self.order_by_data_types,
-            self.ordering_req.clone(),
-            self.ignore_nulls,
-        )
-        .map(|acc| {
-            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
-        })
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        if !self.state_fields.is_empty() {
-            return Ok(self.state_fields.clone());
-        }
-
-        let mut fields = vec![Field::new(
-            format_state_name(&self.name, "first_value"),
-            self.input_data_type.clone(),
-            true,
-        )];
-        fields.extend(ordering_fields(
-            &self.ordering_req,
-            &self.order_by_data_types,
-        ));
-        fields.push(Field::new(
-            format_state_name(&self.name, "is_set"),
-            DataType::Boolean,
-            true,
-        ));
-        Ok(fields)
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
-        (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
-    }
-
     fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        Some(Arc::new(self.clone().convert_to_last()))
-    }
-
-    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        FirstValueAccumulator::try_new(
-            &self.input_data_type,
-            &self.order_by_data_types,
-            self.ordering_req.clone(),
-            self.ignore_nulls,
-        )
-        .map(|acc| {
-            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
-        })
-    }
-}
-
-impl PartialEq<dyn Any> for FirstValuePhysicalExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.input_data_type == x.input_data_type
-                    && self.order_by_data_types == x.order_by_data_types
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
-/// TO BE DEPRECATED: Builtin LAST_VALUE physical aggregate expression will be 
replaced by udf in the future
-#[derive(Debug, Clone)]
-pub struct LastValuePhysicalExpr {
-    name: String,
-    input_data_type: DataType,
-    order_by_data_types: Vec<DataType>,
-    expr: Arc<dyn PhysicalExpr>,
-    ordering_req: LexOrdering,
-    requirement_satisfied: bool,
-    ignore_nulls: bool,
-}
-
-impl LastValuePhysicalExpr {
-    /// Creates a new LAST_VALUE aggregation function.
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        input_data_type: DataType,
-        ordering_req: LexOrdering,
-        order_by_data_types: Vec<DataType>,
-    ) -> Self {
-        let requirement_satisfied = ordering_req.is_empty();
-        Self {
-            name: name.into(),
-            input_data_type,
-            order_by_data_types,
-            expr,
-            ordering_req,
-            requirement_satisfied,
-            ignore_nulls: false,
-        }
-    }
-
-    pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
-        self.ignore_nulls = ignore_nulls;
-        self
-    }
-
-    /// Returns the name of the aggregate expression.
-    pub fn name(&self) -> &str {
-        &self.name
+        "LAST_VALUE"
     }
 
-    /// Returns the input data type of the aggregate expression.
-    pub fn input_data_type(&self) -> &DataType {
-        &self.input_data_type
-    }
-
-    /// Returns the data types of the order-by columns.
-    pub fn order_by_data_types(&self) -> &Vec<DataType> {
-        &self.order_by_data_types
-    }
-
-    /// Returns the expression associated with the aggregate function.
-    pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
-        &self.expr
-    }
-
-    /// Returns the lexical ordering requirements of the aggregate expression.
-    pub fn ordering_req(&self) -> &LexOrdering {
-        &self.ordering_req
+    fn signature(&self) -> &Signature {
+        &self.signature
     }
 
-    pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) 
-> Self {
-        self.requirement_satisfied = requirement_satisfied;
-        self
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(arg_types[0].clone())
     }
 
-    pub fn convert_to_first(self) -> FirstValuePhysicalExpr {
-        let mut name = format!("FIRST{}", &self.name[4..]);
-        replace_order_by_clause(&mut name);
-
-        let LastValuePhysicalExpr {
-            expr,
-            input_data_type,
-            ordering_req,
-            order_by_data_types,
-            ..
-        } = self;
-        FirstValuePhysicalExpr::new(
-            expr,
-            name,
-            input_data_type,
-            reverse_order_bys(&ordering_req),
-            order_by_data_types,
-            vec![],
-        )
-    }
-}
+    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        let ordering_req =
+            convert_logical_sort_exprs_to_physical(acc_args.sort_exprs, 
acc_args.schema)?;
 
-impl AggregateExpr for LastValuePhysicalExpr {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
+        let ordering_dtypes = ordering_req
+            .iter()
+            .map(|e| e.expr.data_type(acc_args.schema))
+            .collect::<Result<Vec<_>>>()?;
 
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, self.input_data_type.clone(), true))
-    }
+        let requirement_satisfied = ordering_req.is_empty() || 
self.requirement_satisfied;
 
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         LastValueAccumulator::try_new(
-            &self.input_data_type,
-            &self.order_by_data_types,
-            self.ordering_req.clone(),
-            self.ignore_nulls,
+            acc_args.data_type,
+            &ordering_dtypes,
+            ordering_req,
+            acc_args.ignore_nulls,
         )
-        .map(|acc| {
-            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
-        })
+        .map(|acc| 
Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
     }
 
-    fn state_fields(&self) -> Result<Vec<Field>> {
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+        let StateFieldsArgs {
+            name,
+            input_type,
+            return_type: _,
+            ordering_fields,
+            is_distinct: _,
+        } = args;
         let mut fields = vec![Field::new(
-            format_state_name(&self.name, "last_value"),
-            self.input_data_type.clone(),
+            format_state_name(name, "last_value"),
+            input_type.clone(),
             true,
         )];
-        fields.extend(ordering_fields(
-            &self.ordering_req,
-            &self.order_by_data_types,
-        ));
-        fields.push(Field::new(
-            format_state_name(&self.name, "is_set"),
-            DataType::Boolean,
-            true,
-        ));
+        fields.extend(ordering_fields.to_vec());
+        fields.push(Field::new("is_set", DataType::Boolean, true));
         Ok(fields)
     }
 
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
-        (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
-    }
-
-    fn name(&self) -> &str {
-        &self.name
+    fn aliases(&self) -> &[String] {
+        &self.aliases
     }
 
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        Some(Arc::new(self.clone().convert_to_first()))
+    fn with_requirement_satisfied(
+        self: Arc<Self>,
+        requirement_satisfied: bool,
+    ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
+        Ok(Some(Arc::new(
+            LastValue::new().with_requirement_satisfied(requirement_satisfied),
+        )))
     }
 
-    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        LastValueAccumulator::try_new(
-            &self.input_data_type,
-            &self.order_by_data_types,
-            self.ordering_req.clone(),
-            self.ignore_nulls,
-        )
-        .map(|acc| {
-            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
-        })
+    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+        AggregateOrderSensitivity::Beneficial
     }
-}
 
-impl PartialEq<dyn Any> for LastValuePhysicalExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.input_data_type == x.input_data_type
-                    && self.order_by_data_types == x.order_by_data_types
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
+    fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
+        datafusion_expr::ReversedUDAF::Reversed(Arc::new(FirstValue::new()))

Review Comment:
   and this



##########
datafusion/physical-expr-common/src/aggregate/mod.rs:
##########
@@ -303,6 +337,74 @@ impl AggregateExpr for AggregateFunctionExpr {
     fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
         (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
     }
+
+    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+        if !self.ordering_req.is_empty() {
+            // If there is requirement, use the sensitivity of the 
implementation
+            self.fun.order_sensitivity()
+        } else {
+            // If no requirement, aggregator is order insensitive
+            AggregateOrderSensitivity::Insensitive
+        }
+    }
+
+    fn with_requirement_satisfied(
+        self: Arc<Self>,
+        requirement_satisfied: bool,
+    ) -> Result<Option<Arc<dyn AggregateExpr>>> {
+        let Some(updated_fn) = self
+            .fun
+            .clone()
+            .with_requirement_satisfied(requirement_satisfied)?
+        else {
+            return Ok(None);
+        };
+        create_aggregate_expr(
+            &updated_fn,
+            &self.args,
+            &self.sort_exprs,
+            &self.ordering_req,
+            &self.schema,
+            self.name(),
+            self.ignore_nulls,
+            self.is_distinct,
+        )
+        .map(Some)
+    }
+
+    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {

Review Comment:
   💯 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to