This is an automated email from the ASF dual-hosted git repository. jiayuliu pushed a commit to branch datafusion-expr in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit b53e5a16dfec188d19fc69c481c4ccf29985b952 Author: Jiayu Liu <ji...@hey.com> AuthorDate: Sun Feb 6 13:45:28 2022 +0800 add datafusion-expr module --- Cargo.toml | 1 + Cargo.toml => datafusion-expr/Cargo.toml | 38 ++++--- datafusion-expr/src/aggregate_function.rs | 93 ++++++++++++++++ datafusion-expr/src/lib.rs | 22 ++++ datafusion-expr/src/window_function.rs | 133 +++++++++++++++++++++++ datafusion/Cargo.toml | 1 + datafusion/src/physical_plan/aggregates.rs | 76 +------------ datafusion/src/physical_plan/window_functions.rs | 119 +------------------- 8 files changed, 277 insertions(+), 206 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 81f6bb5..f74f53c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "datafusion", "datafusion-common", + "datafusion-expr", "datafusion-cli", "datafusion-examples", "benchmarks", diff --git a/Cargo.toml b/datafusion-expr/Cargo.toml similarity index 54% copy from Cargo.toml copy to datafusion-expr/Cargo.toml index 81f6bb5..3cac735 100644 --- a/Cargo.toml +++ b/datafusion-expr/Cargo.toml @@ -15,20 +15,26 @@ # specific language governing permissions and limitations # under the License. -[workspace] -members = [ - "datafusion", - "datafusion-common", - "datafusion-cli", - "datafusion-examples", - "benchmarks", - "ballista/rust/client", - "ballista/rust/core", - "ballista/rust/executor", - "ballista/rust/scheduler", - "ballista-examples", -] +[package] +name = "datafusion-expr" +description = "DataFusion is an in-memory query engine that uses Apache Arrow as the memory model" +version = "6.0.0" +homepage = "https://github.com/apache/arrow-datafusion" +repository = "https://github.com/apache/arrow-datafusion" +readme = "../README.md" +authors = ["Apache Arrow <d...@arrow.apache.org>"] +license = "Apache-2.0" +keywords = [ "arrow", "query", "sql" ] +publish = false +edition = "2021" +rust-version = "1.58" -[profile.release] -lto = true -codegen-units = 1 +[lib] +name = "datafusion_expr" +path = "src/lib.rs" + +[features] + +[dependencies] +datafusion-common = { path = "../datafusion-common" } +arrow = { version = "8.0.0", features = ["prettyprint"] } diff --git a/datafusion-expr/src/aggregate_function.rs b/datafusion-expr/src/aggregate_function.rs new file mode 100644 index 0000000..8f12e88 --- /dev/null +++ b/datafusion-expr/src/aggregate_function.rs @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::{DataFusionError, Result}; +use std::{fmt, str::FromStr}; + +/// Enum of all built-in aggregate functions +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] +pub enum AggregateFunction { + /// count + Count, + /// sum + Sum, + /// min + Min, + /// max + Max, + /// avg + Avg, + /// Approximate aggregate function + ApproxDistinct, + /// array_agg + ArrayAgg, + /// Variance (Sample) + Variance, + /// Variance (Population) + VariancePop, + /// Standard Deviation (Sample) + Stddev, + /// Standard Deviation (Population) + StddevPop, + /// Covariance (Sample) + Covariance, + /// Covariance (Population) + CovariancePop, + /// Correlation + Correlation, + /// Approximate continuous percentile function + ApproxPercentileCont, +} + +impl fmt::Display for AggregateFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // uppercase of the debug. + write!(f, "{}", format!("{:?}", self).to_uppercase()) + } +} + +impl FromStr for AggregateFunction { + type Err = DataFusionError; + fn from_str(name: &str) -> Result<AggregateFunction> { + Ok(match name { + "min" => AggregateFunction::Min, + "max" => AggregateFunction::Max, + "count" => AggregateFunction::Count, + "avg" => AggregateFunction::Avg, + "sum" => AggregateFunction::Sum, + "approx_distinct" => AggregateFunction::ApproxDistinct, + "array_agg" => AggregateFunction::ArrayAgg, + "var" => AggregateFunction::Variance, + "var_samp" => AggregateFunction::Variance, + "var_pop" => AggregateFunction::VariancePop, + "stddev" => AggregateFunction::Stddev, + "stddev_samp" => AggregateFunction::Stddev, + "stddev_pop" => AggregateFunction::StddevPop, + "covar" => AggregateFunction::Covariance, + "covar_samp" => AggregateFunction::Covariance, + "covar_pop" => AggregateFunction::CovariancePop, + "corr" => AggregateFunction::Correlation, + "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont, + _ => { + return Err(DataFusionError::Plan(format!( + "There is no built-in function named {}", + name + ))); + } + }) + } +} diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs new file mode 100644 index 0000000..b6eaaf7 --- /dev/null +++ b/datafusion-expr/src/lib.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod aggregate_function; +mod window_function; + +pub use aggregate_function::AggregateFunction; +pub use window_function::{BuiltInWindowFunction, WindowFunction}; diff --git a/datafusion-expr/src/window_function.rs b/datafusion-expr/src/window_function.rs new file mode 100644 index 0000000..2511874 --- /dev/null +++ b/datafusion-expr/src/window_function.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::aggregate_function::AggregateFunction; +use datafusion_common::{DataFusionError, Result}; +use std::{fmt, str::FromStr}; + +/// WindowFunction +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum WindowFunction { + /// window function that leverages an aggregate function + AggregateFunction(AggregateFunction), + /// window function that leverages a built-in window function + BuiltInWindowFunction(BuiltInWindowFunction), +} + +impl FromStr for WindowFunction { + type Err = DataFusionError; + fn from_str(name: &str) -> Result<WindowFunction> { + let name = name.to_lowercase(); + if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) { + Ok(WindowFunction::AggregateFunction(aggregate)) + } else if let Ok(built_in_function) = + BuiltInWindowFunction::from_str(name.as_str()) + { + Ok(WindowFunction::BuiltInWindowFunction(built_in_function)) + } else { + Err(DataFusionError::Plan(format!( + "There is no window function named {}", + name + ))) + } + } +} + +impl fmt::Display for BuiltInWindowFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + BuiltInWindowFunction::RowNumber => write!(f, "ROW_NUMBER"), + BuiltInWindowFunction::Rank => write!(f, "RANK"), + BuiltInWindowFunction::DenseRank => write!(f, "DENSE_RANK"), + BuiltInWindowFunction::PercentRank => write!(f, "PERCENT_RANK"), + BuiltInWindowFunction::CumeDist => write!(f, "CUME_DIST"), + BuiltInWindowFunction::Ntile => write!(f, "NTILE"), + BuiltInWindowFunction::Lag => write!(f, "LAG"), + BuiltInWindowFunction::Lead => write!(f, "LEAD"), + BuiltInWindowFunction::FirstValue => write!(f, "FIRST_VALUE"), + BuiltInWindowFunction::LastValue => write!(f, "LAST_VALUE"), + BuiltInWindowFunction::NthValue => write!(f, "NTH_VALUE"), + } + } +} + +impl fmt::Display for WindowFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + WindowFunction::AggregateFunction(fun) => fun.fmt(f), + WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f), + } + } +} + +/// An aggregate function that is part of a built-in window function +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum BuiltInWindowFunction { + /// number of the current row within its partition, counting from 1 + RowNumber, + /// rank of the current row with gaps; same as row_number of its first peer + Rank, + /// ank of the current row without gaps; this function counts peer groups + DenseRank, + /// relative rank of the current row: (rank - 1) / (total rows - 1) + PercentRank, + /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows) + CumeDist, + /// integer ranging from 1 to the argument value, dividing the partition as equally as possible + Ntile, + /// returns value evaluated at the row that is offset rows before the current row within the partition; + /// if there is no such row, instead return default (which must be of the same type as value). + /// Both offset and default are evaluated with respect to the current row. + /// If omitted, offset defaults to 1 and default to null + Lag, + /// returns value evaluated at the row that is offset rows after the current row within the partition; + /// if there is no such row, instead return default (which must be of the same type as value). + /// Both offset and default are evaluated with respect to the current row. + /// If omitted, offset defaults to 1 and default to null + Lead, + /// returns value evaluated at the row that is the first row of the window frame + FirstValue, + /// returns value evaluated at the row that is the last row of the window frame + LastValue, + /// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row + NthValue, +} + +impl FromStr for BuiltInWindowFunction { + type Err = DataFusionError; + fn from_str(name: &str) -> Result<BuiltInWindowFunction> { + Ok(match name.to_uppercase().as_str() { + "ROW_NUMBER" => BuiltInWindowFunction::RowNumber, + "RANK" => BuiltInWindowFunction::Rank, + "DENSE_RANK" => BuiltInWindowFunction::DenseRank, + "PERCENT_RANK" => BuiltInWindowFunction::PercentRank, + "CUME_DIST" => BuiltInWindowFunction::CumeDist, + "NTILE" => BuiltInWindowFunction::Ntile, + "LAG" => BuiltInWindowFunction::Lag, + "LEAD" => BuiltInWindowFunction::Lead, + "FIRST_VALUE" => BuiltInWindowFunction::FirstValue, + "LAST_VALUE" => BuiltInWindowFunction::LastValue, + "NTH_VALUE" => BuiltInWindowFunction::NthValue, + _ => { + return Err(DataFusionError::Plan(format!( + "There is no built-in window function named {}", + name + ))) + } + }) + } +} diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 180e037..51f78b4 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -51,6 +51,7 @@ avro = ["avro-rs", "num-traits", "datafusion-common/avro"] [dependencies] datafusion-common = { path = "../datafusion-common" } +datafusion-expr = { path = "../datafusion-expr" } ahash = { version = "0.7", default-features = false } hashbrown = { version = "0.12", features = ["raw"] } arrow = { version = "8.0.0", features = ["prettyprint"] } diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index 8fc94d3..a1531d4 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -38,7 +38,7 @@ use expressions::{ avg_return_type, correlation_return_type, covariance_return_type, stddev_return_type, sum_return_type, variance_return_type, }; -use std::{fmt, str::FromStr, sync::Arc}; +use std::sync::Arc; /// the implementation of an aggregate function pub type AccumulatorFunctionImplementation = @@ -49,79 +49,7 @@ pub type AccumulatorFunctionImplementation = pub type StateTypeFunction = Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>; -/// Enum of all built-in aggregate functions -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] -pub enum AggregateFunction { - /// count - Count, - /// sum - Sum, - /// min - Min, - /// max - Max, - /// avg - Avg, - /// Approximate aggregate function - ApproxDistinct, - /// array_agg - ArrayAgg, - /// Variance (Sample) - Variance, - /// Variance (Population) - VariancePop, - /// Standard Deviation (Sample) - Stddev, - /// Standard Deviation (Population) - StddevPop, - /// Covariance (Sample) - Covariance, - /// Covariance (Population) - CovariancePop, - /// Correlation - Correlation, - /// Approximate continuous percentile function - ApproxPercentileCont, -} - -impl fmt::Display for AggregateFunction { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - // uppercase of the debug. - write!(f, "{}", format!("{:?}", self).to_uppercase()) - } -} - -impl FromStr for AggregateFunction { - type Err = DataFusionError; - fn from_str(name: &str) -> Result<AggregateFunction> { - Ok(match name { - "min" => AggregateFunction::Min, - "max" => AggregateFunction::Max, - "count" => AggregateFunction::Count, - "avg" => AggregateFunction::Avg, - "sum" => AggregateFunction::Sum, - "approx_distinct" => AggregateFunction::ApproxDistinct, - "array_agg" => AggregateFunction::ArrayAgg, - "var" => AggregateFunction::Variance, - "var_samp" => AggregateFunction::Variance, - "var_pop" => AggregateFunction::VariancePop, - "stddev" => AggregateFunction::Stddev, - "stddev_samp" => AggregateFunction::Stddev, - "stddev_pop" => AggregateFunction::StddevPop, - "covar" => AggregateFunction::Covariance, - "covar_samp" => AggregateFunction::Covariance, - "covar_pop" => AggregateFunction::CovariancePop, - "corr" => AggregateFunction::Correlation, - "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont, - _ => { - return Err(DataFusionError::Plan(format!( - "There is no built-in function named {}", - name - ))); - } - }) - } -} +pub use datafusion_expr::AggregateFunction; /// Returns the datatype of the aggregate function. /// This is used to get the returned data type for aggregate expr. diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs index 178a55a..1dcac3f 100644 --- a/datafusion/src/physical_plan/window_functions.rs +++ b/datafusion/src/physical_plan/window_functions.rs @@ -23,130 +23,17 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::functions::{TypeSignature, Volatility}; use crate::physical_plan::{ - aggregates, aggregates::AggregateFunction, functions::Signature, - type_coercion::data_types, windows::find_ranges_in_range, PhysicalExpr, + aggregates, functions::Signature, type_coercion::data_types, + windows::find_ranges_in_range, PhysicalExpr, }; use arrow::array::ArrayRef; use arrow::datatypes::DataType; use arrow::datatypes::Field; use arrow::record_batch::RecordBatch; +pub use datafusion_expr::{BuiltInWindowFunction, WindowFunction}; use std::any::Any; use std::ops::Range; use std::sync::Arc; -use std::{fmt, str::FromStr}; - -/// WindowFunction -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum WindowFunction { - /// window function that leverages an aggregate function - AggregateFunction(AggregateFunction), - /// window function that leverages a built-in window function - BuiltInWindowFunction(BuiltInWindowFunction), -} - -impl FromStr for WindowFunction { - type Err = DataFusionError; - fn from_str(name: &str) -> Result<WindowFunction> { - let name = name.to_lowercase(); - if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) { - Ok(WindowFunction::AggregateFunction(aggregate)) - } else if let Ok(built_in_function) = - BuiltInWindowFunction::from_str(name.as_str()) - { - Ok(WindowFunction::BuiltInWindowFunction(built_in_function)) - } else { - Err(DataFusionError::Plan(format!( - "There is no window function named {}", - name - ))) - } - } -} - -impl fmt::Display for BuiltInWindowFunction { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - BuiltInWindowFunction::RowNumber => write!(f, "ROW_NUMBER"), - BuiltInWindowFunction::Rank => write!(f, "RANK"), - BuiltInWindowFunction::DenseRank => write!(f, "DENSE_RANK"), - BuiltInWindowFunction::PercentRank => write!(f, "PERCENT_RANK"), - BuiltInWindowFunction::CumeDist => write!(f, "CUME_DIST"), - BuiltInWindowFunction::Ntile => write!(f, "NTILE"), - BuiltInWindowFunction::Lag => write!(f, "LAG"), - BuiltInWindowFunction::Lead => write!(f, "LEAD"), - BuiltInWindowFunction::FirstValue => write!(f, "FIRST_VALUE"), - BuiltInWindowFunction::LastValue => write!(f, "LAST_VALUE"), - BuiltInWindowFunction::NthValue => write!(f, "NTH_VALUE"), - } - } -} - -impl fmt::Display for WindowFunction { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - WindowFunction::AggregateFunction(fun) => fun.fmt(f), - WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f), - } - } -} - -/// An aggregate function that is part of a built-in window function -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum BuiltInWindowFunction { - /// number of the current row within its partition, counting from 1 - RowNumber, - /// rank of the current row with gaps; same as row_number of its first peer - Rank, - /// ank of the current row without gaps; this function counts peer groups - DenseRank, - /// relative rank of the current row: (rank - 1) / (total rows - 1) - PercentRank, - /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows) - CumeDist, - /// integer ranging from 1 to the argument value, dividing the partition as equally as possible - Ntile, - /// returns value evaluated at the row that is offset rows before the current row within the partition; - /// if there is no such row, instead return default (which must be of the same type as value). - /// Both offset and default are evaluated with respect to the current row. - /// If omitted, offset defaults to 1 and default to null - Lag, - /// returns value evaluated at the row that is offset rows after the current row within the partition; - /// if there is no such row, instead return default (which must be of the same type as value). - /// Both offset and default are evaluated with respect to the current row. - /// If omitted, offset defaults to 1 and default to null - Lead, - /// returns value evaluated at the row that is the first row of the window frame - FirstValue, - /// returns value evaluated at the row that is the last row of the window frame - LastValue, - /// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row - NthValue, -} - -impl FromStr for BuiltInWindowFunction { - type Err = DataFusionError; - fn from_str(name: &str) -> Result<BuiltInWindowFunction> { - Ok(match name.to_uppercase().as_str() { - "ROW_NUMBER" => BuiltInWindowFunction::RowNumber, - "RANK" => BuiltInWindowFunction::Rank, - "DENSE_RANK" => BuiltInWindowFunction::DenseRank, - "PERCENT_RANK" => BuiltInWindowFunction::PercentRank, - "CUME_DIST" => BuiltInWindowFunction::CumeDist, - "NTILE" => BuiltInWindowFunction::Ntile, - "LAG" => BuiltInWindowFunction::Lag, - "LEAD" => BuiltInWindowFunction::Lead, - "FIRST_VALUE" => BuiltInWindowFunction::FirstValue, - "LAST_VALUE" => BuiltInWindowFunction::LastValue, - "NTH_VALUE" => BuiltInWindowFunction::NthValue, - _ => { - return Err(DataFusionError::Plan(format!( - "There is no built-in window function named {}", - name - ))) - } - }) - } -} /// Returns the datatype of the window function pub fn return_type(