jorgecarleitao commented on a change in pull request #8639: URL: https://github.com/apache/arrow/pull/8639#discussion_r521494620
########## File path: rust/datafusion/src/logical_plan/mod.rs ########## @@ -21,2300 +21,21 @@ //! Logical query plans can then be optimized and executed directly, or translated into //! physical query plans and executed. -use std::fmt::{self, Debug, Display}; -use std::{any::Any, collections::HashMap, collections::HashSet, sync::Arc}; - -use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction}; -use arrow::{ - compute::can_cast_types, - datatypes::{DataType, Field, Schema, SchemaRef}, -}; - -use crate::datasource::parquet::ParquetTable; -use crate::datasource::TableProvider; -use crate::error::{DataFusionError, Result}; -use crate::{ - datasource::csv::{CsvFile, CsvReadOptions}, - physical_plan::udaf::AggregateUDF, - scalar::ScalarValue, -}; -use crate::{ - physical_plan::{ - aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF, - }, - sql::parser::FileType, -}; -use arrow::record_batch::RecordBatch; -use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature}; - +mod builder; +mod display; +mod expr; +mod extension; mod operators; +mod plan; +mod registry; + +pub use builder::LogicalPlanBuilder; +pub use display::display_schema; +pub use expr::{ + and, array, avg, binary_expr, col, concat, count, create_udaf, create_udf, + exprlist_to_fields, length, lit, max, min, sum, Expr, Literal, +}; +pub use extension::UserDefinedLogicalNode; pub use operators::Operator; - -fn create_function_name( - fun: &String, - distinct: bool, - args: &[Expr], - input_schema: &Schema, -) -> Result<String> { - let names: Vec<String> = args - .iter() - .map(|e| create_name(e, input_schema)) - .collect::<Result<_>>()?; - let distinct_str = match distinct { - true => "DISTINCT ", - false => "", - }; - Ok(format!("{}({}{})", fun, distinct_str, names.join(","))) -} - -/// Returns a readable name of an expression based on the input schema. -/// This function recursively transverses the expression for names such as "CAST(a > 2)". -fn create_name(e: &Expr, input_schema: &Schema) -> Result<String> { - match e { - Expr::Alias(_, name) => Ok(name.clone()), - Expr::Column(name) => Ok(name.clone()), - Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")), - Expr::Literal(value) => Ok(format!("{:?}", value)), - Expr::BinaryExpr { left, op, right } => { - let left = create_name(left, input_schema)?; - let right = create_name(right, input_schema)?; - Ok(format!("{} {:?} {}", left, op, right)) - } - Expr::Cast { expr, data_type } => { - let expr = create_name(expr, input_schema)?; - Ok(format!("CAST({} AS {:?})", expr, data_type)) - } - Expr::Not(expr) => { - let expr = create_name(expr, input_schema)?; - Ok(format!("NOT {}", expr)) - } - Expr::IsNull(expr) => { - let expr = create_name(expr, input_schema)?; - Ok(format!("{} IS NULL", expr)) - } - Expr::IsNotNull(expr) => { - let expr = create_name(expr, input_schema)?; - Ok(format!("{} IS NOT NULL", expr)) - } - Expr::ScalarFunction { fun, args, .. } => { - create_function_name(&fun.to_string(), false, args, input_schema) - } - Expr::ScalarUDF { fun, args, .. } => { - create_function_name(&fun.name, false, args, input_schema) - } - Expr::AggregateFunction { - fun, - distinct, - args, - .. - } => create_function_name(&fun.to_string(), *distinct, args, input_schema), - Expr::AggregateUDF { fun, args } => { - let mut names = Vec::with_capacity(args.len()); - for e in args { - names.push(create_name(e, input_schema)?); - } - Ok(format!("{}({})", fun.name, names.join(","))) - } - other => Err(DataFusionError::NotImplemented(format!( - "Physical plan does not support logical expression {:?}", - other - ))), - } -} - -/// Create field meta-data from an expression, for use in a result set schema -pub fn exprlist_to_fields(expr: &[Expr], input_schema: &Schema) -> Result<Vec<Field>> { - expr.iter().map(|e| e.to_field(input_schema)).collect() -} - -/// `Expr` is a logical expression. A logical expression is something like `1 + 1`, or `CAST(c1 AS int)`. -/// Logical expressions know how to compute its [arrow::datatypes::DataType] and nullability. -/// `Expr` is a central struct of DataFusion's query API. -/// -/// # Examples -/// -/// ``` -/// # use datafusion::logical_plan::Expr; -/// # use datafusion::error::Result; -/// # fn main() -> Result<()> { -/// let expr = Expr::Column("c1".to_string()) + Expr::Column("c2".to_string()); -/// println!("{:?}", expr); -/// # Ok(()) -/// # } -/// ``` -#[derive(Clone)] -pub enum Expr { - /// An expression with a specific name. - Alias(Box<Expr>, String), - /// A named reference to a field in a schema. - Column(String), - /// A named reference to a variable in a registry. - ScalarVariable(Vec<String>), - /// A constant value. - Literal(ScalarValue), - /// A binary expression such as "age > 21" - BinaryExpr { - /// Left-hand side of the expression - left: Box<Expr>, - /// The comparison operator - op: Operator, - /// Right-hand side of the expression - right: Box<Expr>, - }, - /// Parenthesized expression. E.g. `(foo > bar)` or `(1)` - Nested(Box<Expr>), - /// Negation of an expression. The expression's type must be a boolean to make sense. - Not(Box<Expr>), - /// Whether an expression is not Null. This expression is never null. - IsNotNull(Box<Expr>), - /// Whether an expression is Null. This expression is never null. - IsNull(Box<Expr>), - /// Casts the expression to a given type. This expression is guaranteed to have a fixed type. - Cast { - /// The expression being cast - expr: Box<Expr>, - /// The `DataType` the expression will yield - data_type: DataType, - }, - /// A sort expression, that can be used to sort values. - Sort { - /// The expression to sort on - expr: Box<Expr>, - /// The direction of the sort - asc: bool, - /// Whether to put Nulls before all other data values - nulls_first: bool, - }, - /// Represents the call of a built-in scalar function with a set of arguments. - ScalarFunction { - /// The function - fun: functions::BuiltinScalarFunction, - /// List of expressions to feed to the functions as arguments - args: Vec<Expr>, - }, - /// Represents the call of a user-defined scalar function with arguments. - ScalarUDF { - /// The function - fun: Arc<ScalarUDF>, - /// List of expressions to feed to the functions as arguments - args: Vec<Expr>, - }, - /// Represents the call of an aggregate built-in function with arguments. - AggregateFunction { - /// Name of the function - fun: aggregates::AggregateFunction, - /// List of expressions to feed to the functions as arguments - args: Vec<Expr>, - /// Whether this is a DISTINCT aggregation or not - distinct: bool, - }, - /// aggregate function - AggregateUDF { - /// The function - fun: Arc<AggregateUDF>, - /// List of expressions to feed to the functions as arguments - args: Vec<Expr>, - }, - /// Represents a reference to all fields in a schema. - Wildcard, -} - -impl Expr { - /// Returns the [arrow::datatypes::DataType] of the expression based on [arrow::datatypes::Schema]. - /// - /// # Errors - /// - /// This function errors when it is not possible to compute its [arrow::datatypes::DataType]. - /// This happens when e.g. the expression refers to a column that does not exist in the schema, or when - /// the expression is incorrectly typed (e.g. `[utf8] + [bool]`). - pub fn get_type(&self, schema: &Schema) -> Result<DataType> { - match self { - Expr::Alias(expr, _) => expr.get_type(schema), - Expr::Column(name) => Ok(schema.field_with_name(name)?.data_type().clone()), - Expr::ScalarVariable(_) => Ok(DataType::Utf8), - Expr::Literal(l) => Ok(l.get_datatype()), - Expr::Cast { data_type, .. } => Ok(data_type.clone()), - Expr::ScalarUDF { fun, args } => { - let data_types = args - .iter() - .map(|e| e.get_type(schema)) - .collect::<Result<Vec<_>>>()?; - Ok((fun.return_type)(&data_types)?.as_ref().clone()) - } - Expr::ScalarFunction { fun, args } => { - let data_types = args - .iter() - .map(|e| e.get_type(schema)) - .collect::<Result<Vec<_>>>()?; - functions::return_type(fun, &data_types) - } - Expr::AggregateFunction { fun, args, .. } => { - let data_types = args - .iter() - .map(|e| e.get_type(schema)) - .collect::<Result<Vec<_>>>()?; - aggregates::return_type(fun, &data_types) - } - Expr::AggregateUDF { fun, args, .. } => { - let data_types = args - .iter() - .map(|e| e.get_type(schema)) - .collect::<Result<Vec<_>>>()?; - Ok((fun.return_type)(&data_types)?.as_ref().clone()) - } - Expr::Not(_) => Ok(DataType::Boolean), - Expr::IsNull(_) => Ok(DataType::Boolean), - Expr::IsNotNull(_) => Ok(DataType::Boolean), - Expr::BinaryExpr { - ref left, - ref right, - ref op, - } => binary_operator_data_type( - &left.get_type(schema)?, - op, - &right.get_type(schema)?, - ), - Expr::Sort { ref expr, .. } => expr.get_type(schema), - Expr::Wildcard => Err(DataFusionError::Internal( - "Wildcard expressions are not valid in a logical query plan".to_owned(), - )), - Expr::Nested(e) => e.get_type(schema), - } - } - - /// Returns the nullability of the expression based on [arrow::datatypes::Schema]. - /// - /// # Errors - /// - /// This function errors when it is not possible to compute its nullability. - /// This happens when the expression refers to a column that does not exist in the schema. - pub fn nullable(&self, input_schema: &Schema) -> Result<bool> { - match self { - Expr::Alias(expr, _) => expr.nullable(input_schema), - Expr::Column(name) => Ok(input_schema.field_with_name(name)?.is_nullable()), - Expr::Literal(value) => Ok(value.is_null()), - Expr::ScalarVariable(_) => Ok(true), - Expr::Cast { expr, .. } => expr.nullable(input_schema), - Expr::ScalarFunction { .. } => Ok(true), - Expr::ScalarUDF { .. } => Ok(true), - Expr::AggregateFunction { .. } => Ok(true), - Expr::AggregateUDF { .. } => Ok(true), - Expr::Not(expr) => expr.nullable(input_schema), - Expr::IsNull(_) => Ok(false), - Expr::IsNotNull(_) => Ok(false), - Expr::BinaryExpr { - ref left, - ref right, - .. - } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?), - Expr::Sort { ref expr, .. } => expr.nullable(input_schema), - Expr::Nested(e) => e.nullable(input_schema), - Expr::Wildcard => Err(DataFusionError::Internal( - "Wildcard expressions are not valid in a logical query plan".to_owned(), - )), - } - } - - /// Returns the name of this expression based on [arrow::datatypes::Schema]. - /// - /// This represents how a column with this expression is named when no alias is chosen - pub fn name(&self, input_schema: &Schema) -> Result<String> { - create_name(self, input_schema) - } - - /// Returns a [arrow::datatypes::Field] compatible with this expression. - pub fn to_field(&self, input_schema: &Schema) -> Result<Field> { - Ok(Field::new( - &self.name(input_schema)?, - self.get_type(input_schema)?, - self.nullable(input_schema)?, - )) - } - - /// Wraps this expression in a cast to a target [arrow::datatypes::DataType]. - /// - /// # Errors - /// - /// This function errors when it is impossible to cast the - /// expression to the target [arrow::datatypes::DataType]. - pub fn cast_to(&self, cast_to_type: &DataType, schema: &Schema) -> Result<Expr> { - let this_type = self.get_type(schema)?; - if this_type == *cast_to_type { - Ok(self.clone()) - } else if can_cast_types(&this_type, cast_to_type) { - Ok(Expr::Cast { - expr: Box::new(self.clone()), - data_type: cast_to_type.clone(), - }) - } else { - Err(DataFusionError::Plan(format!( - "Cannot automatically convert {:?} to {:?}", - this_type, cast_to_type - ))) - } - } - - /// Equal - pub fn eq(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::Eq, other.clone()) - } - - /// Not equal - pub fn not_eq(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::NotEq, other.clone()) - } - - /// Greater than - pub fn gt(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::Gt, other.clone()) - } - - /// Greater than or equal to - pub fn gt_eq(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::GtEq, other.clone()) - } - - /// Less than - pub fn lt(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::Lt, other.clone()) - } - - /// Less than or equal to - pub fn lt_eq(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::LtEq, other.clone()) - } - - /// And - pub fn and(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::And, other) - } - - /// Or - pub fn or(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::Or, other) - } - - /// Not - pub fn not(&self) -> Expr { - Expr::Not(Box::new(self.clone())) - } - - /// Calculate the modulus of two expressions - pub fn modulus(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::Modulus, other.clone()) - } - - /// like (string) another expression - pub fn like(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::Like, other.clone()) - } - - /// not like another expression - pub fn not_like(&self, other: Expr) -> Expr { - binary_expr(self.clone(), Operator::NotLike, other.clone()) - } - - /// Alias - pub fn alias(&self, name: &str) -> Expr { - Expr::Alias(Box::new(self.clone()), name.to_owned()) - } - - /// Create a sort expression from an existing expression. - /// - /// ``` - /// # use datafusion::logical_plan::col; - /// let sort_expr = col("foo").sort(true, true); // SORT ASC NULLS_FIRST - /// ``` - pub fn sort(&self, asc: bool, nulls_first: bool) -> Expr { - Expr::Sort { - expr: Box::new(self.clone()), - asc, - nulls_first, - } - } -} - -fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr { - Expr::BinaryExpr { - left: Box::new(l), - op, - right: Box::new(r), - } -} - -/// return a new expression with a logical AND -pub fn and(left: &Expr, right: &Expr) -> Expr { - Expr::BinaryExpr { - left: Box::new(left.clone()), - op: Operator::And, - right: Box::new(right.clone()), - } -} - -/// Create a column expression based on a column name -pub fn col(name: &str) -> Expr { - Expr::Column(name.to_owned()) -} - -/// Create an expression to represent the min() aggregate function -pub fn min(expr: Expr) -> Expr { - Expr::AggregateFunction { - fun: aggregates::AggregateFunction::Min, - distinct: false, - args: vec![expr], - } -} - -/// Create an expression to represent the max() aggregate function -pub fn max(expr: Expr) -> Expr { - Expr::AggregateFunction { - fun: aggregates::AggregateFunction::Max, - distinct: false, - args: vec![expr], - } -} - -/// Create an expression to represent the sum() aggregate function -pub fn sum(expr: Expr) -> Expr { - Expr::AggregateFunction { - fun: aggregates::AggregateFunction::Sum, - distinct: false, - args: vec![expr], - } -} - -/// Create an expression to represent the avg() aggregate function -pub fn avg(expr: Expr) -> Expr { - Expr::AggregateFunction { - fun: aggregates::AggregateFunction::Avg, - distinct: false, - args: vec![expr], - } -} - -/// Create an expression to represent the count() aggregate function -pub fn count(expr: Expr) -> Expr { - Expr::AggregateFunction { - fun: aggregates::AggregateFunction::Count, - distinct: false, - args: vec![expr], - } -} - -/// Whether it can be represented as a literal expression -pub trait Literal { - /// convert the value to a Literal expression - fn lit(&self) -> Expr; -} - -impl Literal for &str { - fn lit(&self) -> Expr { - Expr::Literal(ScalarValue::Utf8(Some((*self).to_owned()))) - } -} - -impl Literal for String { - fn lit(&self) -> Expr { - Expr::Literal(ScalarValue::Utf8(Some((*self).to_owned()))) - } -} - -macro_rules! make_literal { - ($TYPE:ty, $SCALAR:ident) => { - #[allow(missing_docs)] - impl Literal for $TYPE { - fn lit(&self) -> Expr { - Expr::Literal(ScalarValue::$SCALAR(Some(self.clone()))) - } - } - }; -} - -make_literal!(bool, Boolean); -make_literal!(f32, Float32); -make_literal!(f64, Float64); -make_literal!(i8, Int8); -make_literal!(i16, Int16); -make_literal!(i32, Int32); -make_literal!(i64, Int64); -make_literal!(u8, UInt8); -make_literal!(u16, UInt16); -make_literal!(u32, UInt32); -make_literal!(u64, UInt64); - -/// Create a literal expression -pub fn lit<T: Literal>(n: T) -> Expr { - n.lit() -} - -/// Create an convenience function representing a unary scalar function -macro_rules! unary_math_expr { - ($ENUM:ident, $FUNC:ident) => { - #[allow(missing_docs)] - pub fn $FUNC(e: Expr) -> Expr { - Expr::ScalarFunction { - fun: functions::BuiltinScalarFunction::$ENUM, - args: vec![e], - } - } - }; -} - -// generate methods for creating the supported unary math expressions -unary_math_expr!(Sqrt, sqrt); -unary_math_expr!(Sin, sin); -unary_math_expr!(Cos, cos); -unary_math_expr!(Tan, tan); -unary_math_expr!(Asin, asin); -unary_math_expr!(Acos, acos); -unary_math_expr!(Atan, atan); -unary_math_expr!(Floor, floor); -unary_math_expr!(Ceil, ceil); -unary_math_expr!(Round, round); -unary_math_expr!(Trunc, trunc); -unary_math_expr!(Abs, abs); -unary_math_expr!(Signum, signum); -unary_math_expr!(Exp, exp); -unary_math_expr!(Log, ln); -unary_math_expr!(Log2, log2); Review comment: I think that they are used when creating a statement via the DataFrame API. I.e. in `examples/dataframe.rs`, if we want to create an expression that is `sqrt(col("c1"))`, we need to import `sqrt` from `logical_plan`. No? I find it odd that the compiler is arguing that `pub` functions are not being used, though xD ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org