This is an automated email from the ASF dual-hosted git repository. jiayuliu pushed a commit to branch move-udf-udaf-expr in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit dea18674f46540d21ae39825a1406689b97007e4 Author: Jiayu Liu <ji...@hey.com> AuthorDate: Tue Feb 8 20:53:42 2022 +0800 udf and udaf --- datafusion-expr/src/{lib.rs => expr.rs} | 16 ----- datafusion-expr/src/lib.rs | 4 ++ .../udf.rs => datafusion-expr/src/udaf.rs | 83 +++++++++------------- .../physical_plan => datafusion-expr/src}/udf.rs | 23 ------ datafusion/src/physical_plan/udf.rs | 69 +----------------- 5 files changed, 37 insertions(+), 158 deletions(-) diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/expr.rs similarity index 61% copy from datafusion-expr/src/lib.rs copy to datafusion-expr/src/expr.rs index eb86b0b..b248758 100644 --- a/datafusion-expr/src/lib.rs +++ b/datafusion-expr/src/expr.rs @@ -14,19 +14,3 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - -mod accumulator; -mod aggregate_function; -mod built_in_function; -mod operator; -mod signature; -mod window_frame; -mod window_function; - -pub use accumulator::Accumulator; -pub use aggregate_function::AggregateFunction; -pub use built_in_function::BuiltinScalarFunction; -pub use operator::Operator; -pub use signature::{Signature, TypeSignature, Volatility}; -pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; -pub use window_function::{BuiltInWindowFunction, WindowFunction}; diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs index eb86b0b..0afc910 100644 --- a/datafusion-expr/src/lib.rs +++ b/datafusion-expr/src/lib.rs @@ -20,6 +20,8 @@ mod aggregate_function; mod built_in_function; mod operator; mod signature; +mod udaf; +mod udf; mod window_frame; mod window_function; @@ -28,5 +30,7 @@ pub use aggregate_function::AggregateFunction; pub use built_in_function::BuiltinScalarFunction; pub use operator::Operator; pub use signature::{Signature, TypeSignature, Volatility}; +pub use udaf::AggregateUDF; +pub use udf::ScalarUDF; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; pub use window_function::{BuiltInWindowFunction, WindowFunction}; diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udaf.rs similarity index 50% copy from datafusion/src/physical_plan/udf.rs copy to datafusion-expr/src/udaf.rs index 7355746..3fea4d9 100644 --- a/datafusion/src/physical_plan/udf.rs +++ b/datafusion-expr/src/udaf.rs @@ -15,27 +15,34 @@ // specific language governing permissions and limitations // under the License. -//! UDF support +//! This module contains functions and structs supporting user-defined aggregate functions. use fmt::{Debug, Formatter}; +use std::any::Any; use std::fmt; -use arrow::datatypes::Schema; +use arrow::{ + datatypes::Field, + datatypes::{DataType, Schema}, +}; -use crate::error::Result; -use crate::{logical_plan::Expr, physical_plan::PhysicalExpr}; +use crate::physical_plan::PhysicalExpr; +use crate::{error::Result, logical_plan::Expr}; use super::{ - functions::{ - ReturnTypeFunction, ScalarFunctionExpr, ScalarFunctionImplementation, Signature, - }, + aggregates::AccumulatorFunctionImplementation, + aggregates::StateTypeFunction, + expressions::format_state_name, + functions::{ReturnTypeFunction, Signature}, type_coercion::coerce, + Accumulator, AggregateExpr, }; use std::sync::Arc; -/// Logical representation of a UDF. +/// Logical representation of a user-defined aggregate function (UDAF) +/// A UDAF is different from a UDF in that it is stateful across batches. #[derive(Clone)] -pub struct ScalarUDF { +pub struct AggregateUDF { /// name pub name: String, /// signature @@ -43,19 +50,14 @@ pub struct ScalarUDF { /// Return type pub return_type: ReturnTypeFunction, /// actual implementation - /// - /// The fn param is the wrapped function but be aware that the function will - /// be passed with the slice / vec of columnar values (either scalar or array) - /// with the exception of zero param function, where a singular element vec - /// will be passed. In that case the single element is a null array to indicate - /// the batch's row count (so that the generative zero-argument function can know - /// the result array size). - pub fun: ScalarFunctionImplementation, + pub accumulator: AccumulatorFunctionImplementation, + /// the accumulator's state's description as a function of the return type + pub state_type: StateTypeFunction, } -impl Debug for ScalarUDF { +impl Debug for AggregateUDF { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ScalarUDF") + f.debug_struct("AggregateUDF") .field("name", &self.name) .field("signature", &self.signature) .field("fun", &"<FUNC>") @@ -63,64 +65,43 @@ impl Debug for ScalarUDF { } } -impl PartialEq for ScalarUDF { +impl PartialEq for AggregateUDF { fn eq(&self, other: &Self) -> bool { self.name == other.name && self.signature == other.signature } } -impl std::hash::Hash for ScalarUDF { +impl std::hash::Hash for AggregateUDF { fn hash<H: std::hash::Hasher>(&self, state: &mut H) { self.name.hash(state); self.signature.hash(state); } } -impl ScalarUDF { - /// Create a new ScalarUDF +impl AggregateUDF { + /// Create a new AggregateUDF pub fn new( name: &str, signature: &Signature, return_type: &ReturnTypeFunction, - fun: &ScalarFunctionImplementation, + accumulator: &AccumulatorFunctionImplementation, + state_type: &StateTypeFunction, ) -> Self { Self { name: name.to_owned(), signature: signature.clone(), return_type: return_type.clone(), - fun: fun.clone(), + accumulator: accumulator.clone(), + state_type: state_type.clone(), } } - /// creates a logical expression with a call of the UDF - /// This utility allows using the UDF without requiring access to the registry. + /// creates a logical expression with a call of the UDAF + /// This utility allows using the UDAF without requiring access to the registry. pub fn call(&self, args: Vec<Expr>) -> Expr { - Expr::ScalarUDF { + Expr::AggregateUDF { fun: Arc::new(self.clone()), args, } } } - -/// Create a physical expression of the UDF. -/// This function errors when `args`' can't be coerced to a valid argument type of the UDF. -pub fn create_physical_expr( - fun: &ScalarUDF, - input_phy_exprs: &[Arc<dyn PhysicalExpr>], - input_schema: &Schema, -) -> Result<Arc<dyn PhysicalExpr>> { - // coerce - let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?; - - let coerced_exprs_types = coerced_phy_exprs - .iter() - .map(|e| e.data_type(input_schema)) - .collect::<Result<Vec<_>>>()?; - - Ok(Arc::new(ScalarFunctionExpr::new( - &fun.name, - fun.fun.clone(), - coerced_phy_exprs, - (fun.return_type)(&coerced_exprs_types)?.as_ref(), - ))) -} diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udf.rs similarity index 81% copy from datafusion/src/physical_plan/udf.rs copy to datafusion-expr/src/udf.rs index 7355746..9e7bebc 100644 --- a/datafusion/src/physical_plan/udf.rs +++ b/datafusion-expr/src/udf.rs @@ -101,26 +101,3 @@ impl ScalarUDF { } } } - -/// Create a physical expression of the UDF. -/// This function errors when `args`' can't be coerced to a valid argument type of the UDF. -pub fn create_physical_expr( - fun: &ScalarUDF, - input_phy_exprs: &[Arc<dyn PhysicalExpr>], - input_schema: &Schema, -) -> Result<Arc<dyn PhysicalExpr>> { - // coerce - let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?; - - let coerced_exprs_types = coerced_phy_exprs - .iter() - .map(|e| e.data_type(input_schema)) - .collect::<Result<Vec<_>>>()?; - - Ok(Arc::new(ScalarFunctionExpr::new( - &fun.name, - fun.fun.clone(), - coerced_phy_exprs, - (fun.return_type)(&coerced_exprs_types)?.as_ref(), - ))) -} diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs index 7355746..85e6b02 100644 --- a/datafusion/src/physical_plan/udf.rs +++ b/datafusion/src/physical_plan/udf.rs @@ -33,74 +33,7 @@ use super::{ }; use std::sync::Arc; -/// Logical representation of a UDF. -#[derive(Clone)] -pub struct ScalarUDF { - /// name - pub name: String, - /// signature - pub signature: Signature, - /// Return type - pub return_type: ReturnTypeFunction, - /// actual implementation - /// - /// The fn param is the wrapped function but be aware that the function will - /// be passed with the slice / vec of columnar values (either scalar or array) - /// with the exception of zero param function, where a singular element vec - /// will be passed. In that case the single element is a null array to indicate - /// the batch's row count (so that the generative zero-argument function can know - /// the result array size). - pub fun: ScalarFunctionImplementation, -} - -impl Debug for ScalarUDF { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ScalarUDF") - .field("name", &self.name) - .field("signature", &self.signature) - .field("fun", &"<FUNC>") - .finish() - } -} - -impl PartialEq for ScalarUDF { - fn eq(&self, other: &Self) -> bool { - self.name == other.name && self.signature == other.signature - } -} - -impl std::hash::Hash for ScalarUDF { - fn hash<H: std::hash::Hasher>(&self, state: &mut H) { - self.name.hash(state); - self.signature.hash(state); - } -} - -impl ScalarUDF { - /// Create a new ScalarUDF - pub fn new( - name: &str, - signature: &Signature, - return_type: &ReturnTypeFunction, - fun: &ScalarFunctionImplementation, - ) -> Self { - Self { - name: name.to_owned(), - signature: signature.clone(), - return_type: return_type.clone(), - fun: fun.clone(), - } - } - - /// creates a logical expression with a call of the UDF - /// This utility allows using the UDF without requiring access to the registry. - pub fn call(&self, args: Vec<Expr>) -> Expr { - Expr::ScalarUDF { - fun: Arc::new(self.clone()), - args, - } - } -} +pub use datafusion_expr::ScalarUDF; /// Create a physical expression of the UDF. /// This function errors when `args`' can't be coerced to a valid argument type of the UDF.