This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d2fb05ed5b PhysicalExpr Orderings with Range Information (#10504)
d2fb05ed5b is described below
commit d2fb05ed5ba71fd0f1d440baca12897413c2a8af
Author: Berkay Şahin <[email protected]>
AuthorDate: Fri May 17 14:08:01 2024 +0300
PhysicalExpr Orderings with Range Information (#10504)
* Self review
* Fix null interval accumulation
* Refactor monotonicity
* Ignore failing tests
* Initial impl
* Ready for review
* Update properties.rs
* Update configs.md
Update configs.md
* cargo doc
* Add abs test
* Update properties.rs
* Update udf.rs
* Review Part 1
* Review Part 2
* Minor
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion-examples/examples/advanced_udf.rs | 28 +--
datafusion-examples/examples/function_factory.rs | 13 +-
.../src/physical_optimizer/enforce_distribution.rs | 8 +-
.../core/src/physical_optimizer/join_selection.rs | 4 +-
.../src/physical_optimizer/projection_pushdown.rs | 4 -
.../fuzz_cases/sort_preserving_repartition_fuzz.rs | 6 +-
.../user_defined/user_defined_scalar_functions.rs | 15 +-
datafusion/expr/src/interval_arithmetic.rs | 104 +++++++--
datafusion/expr/src/lib.rs | 4 +-
datafusion/expr/src/signature.rs | 8 -
.../src/sort_properties.rs | 59 +++--
datafusion/expr/src/udf.rs | 132 +++++++++--
datafusion/functions/src/datetime/date_bin.rs | 28 ++-
datafusion/functions/src/datetime/date_trunc.rs | 33 ++-
datafusion/functions/src/macros.rs | 36 +--
datafusion/functions/src/math/abs.rs | 43 ++--
datafusion/functions/src/math/log.rs | 46 ++--
datafusion/functions/src/math/mod.rs | 72 +++---
datafusion/functions/src/math/monotonicity.rs | 241 +++++++++++++++++++++
datafusion/functions/src/math/pi.rs | 10 +-
datafusion/functions/src/math/round.rs | 27 ++-
datafusion/functions/src/math/trunc.rs | 27 ++-
datafusion/physical-expr-common/src/lib.rs | 1 -
.../physical-expr-common/src/physical_expr.rs | 24 +-
datafusion/physical-expr-common/src/utils.rs | 31 ++-
datafusion/physical-expr/src/equivalence/mod.rs | 15 +-
.../physical-expr/src/equivalence/ordering.rs | 26 +--
.../physical-expr/src/equivalence/projection.rs | 25 +--
.../physical-expr/src/equivalence/properties.rs | 223 +++++++++++--------
datafusion/physical-expr/src/expressions/binary.rs | 57 +++--
datafusion/physical-expr/src/expressions/cast.rs | 28 ++-
.../physical-expr/src/expressions/literal.rs | 11 +-
.../physical-expr/src/expressions/negative.rs | 9 +-
datafusion/physical-expr/src/functions.rs | 8 +-
datafusion/physical-expr/src/lib.rs | 7 -
datafusion/physical-expr/src/scalar_function.rs | 114 +++-------
datafusion/physical-expr/src/utils/mod.rs | 11 +-
datafusion/physical-plan/src/aggregates/mod.rs | 2 +-
datafusion/physical-plan/src/filter.rs | 5 +-
datafusion/proto/src/physical_plan/from_proto.rs | 1 -
.../proto/tests/cases/roundtrip_physical_plan.rs | 2 -
datafusion/sqllogictest/test_files/order.slt | 151 +++++++++++++
42 files changed, 1196 insertions(+), 503 deletions(-)
diff --git a/datafusion-examples/examples/advanced_udf.rs
b/datafusion-examples/examples/advanced_udf.rs
index c8063c0eb1..d1ef1c6c9d 100644
--- a/datafusion-examples/examples/advanced_udf.rs
+++ b/datafusion-examples/examples/advanced_udf.rs
@@ -15,26 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::{
- arrow::{
- array::{ArrayRef, Float32Array, Float64Array},
- datatypes::DataType,
- record_batch::RecordBatch,
- },
- logical_expr::Volatility,
-};
use std::any::Any;
+use std::sync::Arc;
-use arrow::array::{new_null_array, Array, AsArray};
+use arrow::array::{
+ new_null_array, Array, ArrayRef, AsArray, Float32Array, Float64Array,
+};
use arrow::compute;
-use arrow::datatypes::Float64Type;
+use arrow::datatypes::{DataType, Float64Type};
+use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
+use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{internal_err, ScalarValue};
-use datafusion_expr::{
- ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature,
-};
-use std::sync::Arc;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};
/// This example shows how to use the full ScalarUDFImpl API to implement a
user
/// defined function. As in the `simple_udf.rs` example, this struct implements
@@ -186,8 +181,9 @@ impl ScalarUDFImpl for PowUdf {
&self.aliases
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(Some(vec![Some(true)]))
+ fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties>
{
+ // The POW function preserves the order of its argument.
+ Ok(input[0].sort_properties)
}
}
diff --git a/datafusion-examples/examples/function_factory.rs
b/datafusion-examples/examples/function_factory.rs
index 3973e50474..9e624b6629 100644
--- a/datafusion-examples/examples/function_factory.rs
+++ b/datafusion-examples/examples/function_factory.rs
@@ -15,17 +15,18 @@
// specific language governing permissions and limitations
// under the License.
+use std::result::Result as RResult;
+use std::sync::Arc;
+
use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{exec_err, internal_err, DataFusionError};
-use datafusion_expr::simplify::ExprSimplifyResult;
-use datafusion_expr::simplify::SimplifyInfo;
+use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{CreateFunction, Expr, ScalarUDF, ScalarUDFImpl,
Signature};
-use std::result::Result as RResult;
-use std::sync::Arc;
/// This example shows how to utilize [FunctionFactory] to implement simple
/// SQL-macro like functions using a `CREATE FUNCTION` statement. The same
@@ -156,8 +157,8 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
&[]
}
- fn monotonicity(&self) ->
Result<Option<datafusion_expr::FuncMonotonicity>> {
- Ok(None)
+ fn monotonicity(&self, _input: &[ExprProperties]) ->
Result<SortProperties> {
+ Ok(SortProperties::Unordered)
}
}
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index c07f2c5dcf..cd84e911d3 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -3572,7 +3572,11 @@ pub(crate) mod tests {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
- let alias = vec![("a".to_string(), "a".to_string())];
+ let alias = vec![
+ ("a".to_string(), "a".to_string()),
+ ("b".to_string(), "b".to_string()),
+ ("c".to_string(), "c".to_string()),
+ ];
let plan = sort_preserving_merge_exec(
sort_key.clone(),
sort_exec(
@@ -3585,7 +3589,7 @@ pub(crate) mod tests {
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
// Since this projection is trivial, increasing parallelism is not
beneficial
- "ProjectionExec: expr=[a@0 as a]",
+ "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e]",
];
assert_optimized!(expected, plan.clone(), true);
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 042a0198bf..135a59aa03 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -39,8 +39,8 @@ use crate::physical_plan::{ExecutionPlan,
ExecutionPlanProperties};
use arrow_schema::Schema;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, JoinSide, JoinType};
+use datafusion_expr::sort_properties::SortProperties;
use datafusion_physical_expr::expressions::Column;
-use datafusion_physical_expr::sort_properties::SortProperties;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
/// The [`JoinSelection`] rule tries to modify a given plan so that it can
@@ -561,7 +561,7 @@ fn hash_join_convert_symmetric_subrule(
let name = schema.field(*index).name();
let col = Arc::new(Column::new(name, *index))
as _;
// Check if the column is ordered.
- equivalence.get_expr_ordering(col).data
+
equivalence.get_expr_properties(col).sort_properties
!= SortProperties::Unordered
},
)
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 0190f35cc9..fe1290e407 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1376,7 +1376,6 @@ mod tests {
)),
],
DataType::Int32,
- None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
@@ -1442,7 +1441,6 @@ mod tests {
)),
],
DataType::Int32,
- None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 3))),
@@ -1511,7 +1509,6 @@ mod tests {
)),
],
DataType::Int32,
- None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
@@ -1577,7 +1574,6 @@ mod tests {
)),
],
DataType::Int32,
- None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d_new", 3))),
diff --git
a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
index 6c9c3359eb..21ef8a7c21 100644
--- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
@@ -39,12 +39,12 @@ mod sp_repartition_fuzz_tests {
config::SessionConfig, memory_pool::MemoryConsumer,
SendableRecordBatchStream,
};
use datafusion_physical_expr::{
+ equivalence::{EquivalenceClass, EquivalenceProperties},
expressions::{col, Column},
- EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+ PhysicalExpr, PhysicalSortExpr,
};
use test_utils::add_empty_batches;
- use datafusion_physical_expr::equivalence::EquivalenceClass;
use itertools::izip;
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
@@ -78,7 +78,7 @@ mod sp_repartition_fuzz_tests {
let mut eq_properties =
EquivalenceProperties::new(test_schema.clone());
// Define a and f are aliases
- eq_properties.add_equal_conditions(col_a, col_f);
+ eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([col_e.clone()]);
diff --git
a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
index def9fcb4c6..df41cab7bf 100644
--- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
@@ -15,26 +15,27 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
+use std::sync::Arc;
+
use arrow::compute::kernels::numeric::add;
use arrow_array::{ArrayRef, Float32Array, Float64Array, Int32Array,
RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::context::{FunctionFactory, RegisterFunction,
SessionState};
use datafusion::prelude::*;
use datafusion::{execution::registry::FunctionRegistry, test_util};
+use datafusion_common::cast::{as_float64_array, as_int32_array};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
- assert_batches_eq, assert_batches_sorted_eq, cast::as_float64_array,
- cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result,
ScalarValue,
+ assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err,
internal_err,
+ not_impl_err, plan_err, DataFusionError, ExprSchema, Result, ScalarValue,
};
-use datafusion_common::{assert_contains, exec_err, internal_err,
DataFusionError};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
Accumulator, ColumnarValue, CreateFunction, ExprSchemable,
LogicalPlanBuilder,
ScalarUDF, ScalarUDFImpl, Signature, Volatility,
};
-use std::any::Any;
-use std::sync::Arc;
/// test that casting happens on udfs.
/// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the
logical plan and
@@ -776,10 +777,6 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
fn aliases(&self) -> &[String] {
&[]
}
-
- fn monotonicity(&self) ->
Result<Option<datafusion_expr::FuncMonotonicity>> {
- Ok(None)
- }
}
impl ScalarFunctionWrapper {
diff --git a/datafusion/expr/src/interval_arithmetic.rs
b/datafusion/expr/src/interval_arithmetic.rs
index ca91a8c9da..c4890b97e7 100644
--- a/datafusion/expr/src/interval_arithmetic.rs
+++ b/datafusion/expr/src/interval_arithmetic.rs
@@ -273,19 +273,34 @@ impl Interval {
unreachable!();
};
// Standardize boolean interval endpoints:
- Self {
+ return Self {
lower: ScalarValue::Boolean(Some(lower_bool.unwrap_or(false))),
upper: ScalarValue::Boolean(Some(upper_bool.unwrap_or(true))),
- }
+ };
}
- // Standardize floating-point endpoints:
- else if lower.data_type() == DataType::Float32 {
- handle_float_intervals!(Float32, f32, lower, upper)
- } else if lower.data_type() == DataType::Float64 {
- handle_float_intervals!(Float64, f64, lower, upper)
- } else {
+ match lower.data_type() {
+ // Standardize floating-point endpoints:
+ DataType::Float32 => handle_float_intervals!(Float32, f32, lower,
upper),
+ DataType::Float64 => handle_float_intervals!(Float64, f64, lower,
upper),
+ // Unsigned null values for lower bounds are set to zero:
+ DataType::UInt8 if lower.is_null() => Self {
+ lower: ScalarValue::UInt8(Some(0)),
+ upper,
+ },
+ DataType::UInt16 if lower.is_null() => Self {
+ lower: ScalarValue::UInt16(Some(0)),
+ upper,
+ },
+ DataType::UInt32 if lower.is_null() => Self {
+ lower: ScalarValue::UInt32(Some(0)),
+ upper,
+ },
+ DataType::UInt64 if lower.is_null() => Self {
+ lower: ScalarValue::UInt64(Some(0)),
+ upper,
+ },
// Other data types do not require standardization:
- Self { lower, upper }
+ _ => Self { lower, upper },
}
}
@@ -299,6 +314,12 @@ impl Interval {
Self::try_new(ScalarValue::from(lower), ScalarValue::from(upper))
}
+ /// Creates a singleton zero interval if the datatype supported.
+ pub fn make_zero(data_type: &DataType) -> Result<Self> {
+ let zero_endpoint = ScalarValue::new_zero(data_type)?;
+ Ok(Self::new(zero_endpoint.clone(), zero_endpoint))
+ }
+
/// Creates an unbounded interval from both sides if the datatype
supported.
pub fn make_unbounded(data_type: &DataType) -> Result<Self> {
let unbounded_endpoint = ScalarValue::try_from(data_type)?;
@@ -369,7 +390,7 @@ impl Interval {
/// NOTE: This function only works with intervals of the same data type.
/// Attempting to compare intervals of different data types will lead
/// to an error.
- pub(crate) fn gt<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+ pub fn gt<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
let rhs = other.borrow();
if self.data_type().ne(&rhs.data_type()) {
internal_err!(
@@ -402,7 +423,7 @@ impl Interval {
/// NOTE: This function only works with intervals of the same data type.
/// Attempting to compare intervals of different data types will lead
/// to an error.
- pub(crate) fn gt_eq<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+ pub fn gt_eq<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
let rhs = other.borrow();
if self.data_type().ne(&rhs.data_type()) {
internal_err!(
@@ -435,7 +456,7 @@ impl Interval {
/// NOTE: This function only works with intervals of the same data type.
/// Attempting to compare intervals of different data types will lead
/// to an error.
- pub(crate) fn lt<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+ pub fn lt<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
other.borrow().gt(self)
}
@@ -446,7 +467,7 @@ impl Interval {
/// NOTE: This function only works with intervals of the same data type.
/// Attempting to compare intervals of different data types will lead
/// to an error.
- pub(crate) fn lt_eq<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+ pub fn lt_eq<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
other.borrow().gt_eq(self)
}
@@ -457,7 +478,7 @@ impl Interval {
/// NOTE: This function only works with intervals of the same data type.
/// Attempting to compare intervals of different data types will lead
/// to an error.
- pub(crate) fn equal<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+ pub fn equal<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
let rhs = other.borrow();
if get_result_type(&self.data_type(), &Operator::Eq,
&rhs.data_type()).is_err() {
internal_err!(
@@ -480,7 +501,7 @@ impl Interval {
/// Compute the logical conjunction of this (boolean) interval with the
/// given boolean interval.
- pub(crate) fn and<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+ pub fn and<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
let rhs = other.borrow();
match (&self.lower, &self.upper, &rhs.lower, &rhs.upper) {
(
@@ -501,8 +522,31 @@ impl Interval {
}
}
+ /// Compute the logical disjunction of this boolean interval with the
+ /// given boolean interval.
+ pub fn or<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+ let rhs = other.borrow();
+ match (&self.lower, &self.upper, &rhs.lower, &rhs.upper) {
+ (
+ &ScalarValue::Boolean(Some(self_lower)),
+ &ScalarValue::Boolean(Some(self_upper)),
+ &ScalarValue::Boolean(Some(other_lower)),
+ &ScalarValue::Boolean(Some(other_upper)),
+ ) => {
+ let lower = self_lower || other_lower;
+ let upper = self_upper || other_upper;
+
+ Ok(Self {
+ lower: ScalarValue::Boolean(Some(lower)),
+ upper: ScalarValue::Boolean(Some(upper)),
+ })
+ }
+ _ => internal_err!("Incompatible data types for logical
conjunction"),
+ }
+ }
+
/// Compute the logical negation of this (boolean) interval.
- pub(crate) fn not(&self) -> Result<Self> {
+ pub fn not(&self) -> Result<Self> {
if self.data_type().ne(&DataType::Boolean) {
internal_err!("Cannot apply logical negation to a non-boolean
interval")
} else if self == &Self::CERTAINLY_TRUE {
@@ -761,6 +805,18 @@ impl Interval {
}
.map(|result| result + 1)
}
+
+ /// Reflects an [`Interval`] around the point zero.
+ ///
+ /// This method computes the arithmetic negation of the interval,
reflecting
+ /// it about the origin of the number line. This operation swaps and
negates
+ /// the lower and upper bounds of the interval.
+ pub fn arithmetic_negate(self) -> Result<Self> {
+ Ok(Self {
+ lower: self.upper().clone().arithmetic_negate()?,
+ upper: self.lower().clone().arithmetic_negate()?,
+ })
+ }
}
impl Display for Interval {
@@ -1885,10 +1941,10 @@ mod tests {
let unbounded_cases = vec![
(DataType::Boolean, Boolean(Some(false)), Boolean(Some(true))),
- (DataType::UInt8, UInt8(None), UInt8(None)),
- (DataType::UInt16, UInt16(None), UInt16(None)),
- (DataType::UInt32, UInt32(None), UInt32(None)),
- (DataType::UInt64, UInt64(None), UInt64(None)),
+ (DataType::UInt8, UInt8(Some(0)), UInt8(None)),
+ (DataType::UInt16, UInt16(Some(0)), UInt16(None)),
+ (DataType::UInt32, UInt32(Some(0)), UInt32(None)),
+ (DataType::UInt64, UInt64(Some(0)), UInt64(None)),
(DataType::Int8, Int8(None), Int8(None)),
(DataType::Int16, Int16(None), Int16(None)),
(DataType::Int32, Int32(None), Int32(None)),
@@ -1994,6 +2050,10 @@ mod tests {
Interval::make(None, Some(1000_i64))?,
Interval::make(Some(1000_i64), Some(1500_i64))?,
),
+ (
+ Interval::make(Some(0_u8), Some(0_u8))?,
+ Interval::make::<u8>(None, None)?,
+ ),
(
Interval::try_new(
prev_value(ScalarValue::Float32(Some(0.0_f32))),
@@ -2036,6 +2096,10 @@ mod tests {
Interval::make(Some(-1000_i64), Some(1000_i64))?,
Interval::make(None, Some(-1500_i64))?,
),
+ (
+ Interval::make::<u64>(None, None)?,
+ Interval::make(Some(0_u64), Some(0_u64))?,
+ ),
(
Interval::make(Some(0.0_f32), Some(0.0_f32))?,
Interval::make(Some(0.0_f32), Some(0.0_f32))?,
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index e2b68388ab..bac2f9c145 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -50,6 +50,7 @@ pub mod groups_accumulator;
pub mod interval_arithmetic;
pub mod logical_plan;
pub mod simplify;
+pub mod sort_properties;
pub mod tree_node;
pub mod type_coercion;
pub mod utils;
@@ -77,8 +78,7 @@ pub use logical_plan::*;
pub use operator::Operator;
pub use partition_evaluator::PartitionEvaluator;
pub use signature::{
- ArrayFunctionSignature, FuncMonotonicity, Signature, TypeSignature,
Volatility,
- TIMEZONE_WILDCARD,
+ ArrayFunctionSignature, Signature, TypeSignature, Volatility,
TIMEZONE_WILDCARD,
};
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::{AggregateUDF, AggregateUDFImpl};
diff --git a/datafusion/expr/src/signature.rs b/datafusion/expr/src/signature.rs
index 5d925c8605..63b030f0b7 100644
--- a/datafusion/expr/src/signature.rs
+++ b/datafusion/expr/src/signature.rs
@@ -343,14 +343,6 @@ impl Signature {
}
}
-/// Monotonicity of the `ScalarFunctionExpr` with respect to its arguments.
-/// Each element of this vector corresponds to an argument and indicates
whether
-/// the function's behavior is monotonic, or non-monotonic/unknown for that
argument, namely:
-/// - `None` signifies unknown monotonicity or non-monotonicity.
-/// - `Some(true)` indicates that the function is monotonically increasing
w.r.t. the argument in question.
-/// - Some(false) indicates that the function is monotonically decreasing
w.r.t. the argument in question.
-pub type FuncMonotonicity = Vec<Option<bool>>;
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/physical-expr-common/src/sort_properties.rs
b/datafusion/expr/src/sort_properties.rs
similarity index 77%
rename from datafusion/physical-expr-common/src/sort_properties.rs
rename to datafusion/expr/src/sort_properties.rs
index 47a5d5ba5e..7778be2ecf 100644
--- a/datafusion/physical-expr-common/src/sort_properties.rs
+++ b/datafusion/expr/src/sort_properties.rs
@@ -17,9 +17,10 @@
use std::ops::Neg;
-use arrow::compute::SortOptions;
+use crate::interval_arithmetic::Interval;
-use crate::tree_node::ExprContext;
+use arrow::compute::SortOptions;
+use arrow::datatypes::DataType;
/// To propagate [`SortOptions`] across the `PhysicalExpr`, it is insufficient
/// to simply use `Option<SortOptions>`: There must be a differentiation
between
@@ -120,29 +121,39 @@ impl SortProperties {
impl Neg for SortProperties {
type Output = Self;
- fn neg(self) -> Self::Output {
- match self {
- SortProperties::Ordered(SortOptions {
- descending,
- nulls_first,
- }) => SortProperties::Ordered(SortOptions {
- descending: !descending,
- nulls_first,
- }),
- SortProperties::Singleton => SortProperties::Singleton,
- SortProperties::Unordered => SortProperties::Unordered,
+ fn neg(mut self) -> Self::Output {
+ if let SortProperties::Ordered(SortOptions { descending, .. }) = &mut
self {
+ *descending = !*descending;
}
+ self
}
}
-/// The `ExprOrdering` struct is designed to aid in the determination of
ordering (represented
-/// by [`SortProperties`]) for a given `PhysicalExpr`. When analyzing the
orderings
-/// of a `PhysicalExpr`, the process begins by assigning the ordering of its
leaf nodes.
-/// By propagating these leaf node orderings upwards in the expression tree,
the overall
-/// ordering of the entire `PhysicalExpr` can be derived.
-///
-/// This struct holds the necessary state information for each expression in
the `PhysicalExpr`.
-/// It encapsulates the orderings (`data`) associated with the expression
(`expr`), and
-/// orderings of the children expressions (`children`). The [`ExprOrdering`]
of a parent
-/// expression is determined based on the [`ExprOrdering`] states of its
children expressions.
-pub type ExprOrdering = ExprContext<SortProperties>;
+/// Represents the properties of a `PhysicalExpr`, including its sorting and
range attributes.
+#[derive(Debug, Clone)]
+pub struct ExprProperties {
+ pub sort_properties: SortProperties,
+ pub range: Interval,
+}
+
+impl ExprProperties {
+ /// Creates a new `ExprProperties` instance with unknown sort properties
and unknown range.
+ pub fn new_unknown() -> Self {
+ Self {
+ sort_properties: SortProperties::default(),
+ range: Interval::make_unbounded(&DataType::Null).unwrap(),
+ }
+ }
+
+ /// Sets the sorting properties of the expression and returns the modified
instance.
+ pub fn with_order(mut self, order: SortProperties) -> Self {
+ self.sort_properties = order;
+ self
+ }
+
+ /// Sets the range of the expression and returns the modified instance.
+ pub fn with_range(mut self, range: Interval) -> Self {
+ self.range = range;
+ self
+ }
+}
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index fadea26e7f..921d13ab35 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -17,19 +17,20 @@
//! [`ScalarUDF`]: Scalar User Defined Functions
+use std::any::Any;
+use std::fmt::{self, Debug, Formatter};
+use std::sync::Arc;
+
use crate::expr::create_name;
+use crate::interval_arithmetic::Interval;
use crate::simplify::{ExprSimplifyResult, SimplifyInfo};
+use crate::sort_properties::{ExprProperties, SortProperties};
use crate::{
- ColumnarValue, Expr, FuncMonotonicity, ReturnTypeFunction,
- ScalarFunctionImplementation, Signature,
+ ColumnarValue, Expr, ReturnTypeFunction, ScalarFunctionImplementation,
Signature,
};
+
use arrow::datatypes::DataType;
use datafusion_common::{not_impl_err, ExprSchema, Result};
-use std::any::Any;
-use std::fmt;
-use std::fmt::Debug;
-use std::fmt::Formatter;
-use std::sync::Arc;
/// Logical representation of a Scalar User Defined Function.
///
@@ -202,18 +203,63 @@ impl ScalarUDF {
Arc::new(move |args| captured.invoke(args))
}
- /// This function specifies monotonicity behaviors for User defined scalar
functions.
- ///
- /// See [`ScalarUDFImpl::monotonicity`] for more details.
- pub fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- self.inner.monotonicity()
- }
-
/// Get the circuits of inner implementation
pub fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
+ /// Computes the output interval for a [`ScalarUDF`], given the input
+ /// intervals.
+ ///
+ /// # Parameters
+ ///
+ /// * `inputs` are the intervals for the inputs (children) of this
function.
+ ///
+ /// # Example
+ ///
+ /// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`,
+ /// then the output interval would be `[0, 3]`.
+ pub fn evaluate_bounds(&self, inputs: &[&Interval]) -> Result<Interval> {
+ self.inner.evaluate_bounds(inputs)
+ }
+
+ /// Updates bounds for child expressions, given a known interval for this
+ /// function. This is used to propagate constraints down through an
expression
+ /// tree.
+ ///
+ /// # Parameters
+ ///
+ /// * `interval` is the currently known interval for this function.
+ /// * `inputs` are the current intervals for the inputs (children) of this
function.
+ ///
+ /// # Returns
+ ///
+ /// A `Vec` of new intervals for the children, in order.
+ ///
+ /// If constraint propagation reveals an infeasibility for any child,
returns
+ /// [`None`]. If none of the children intervals change as a result of
+ /// propagation, may return an empty vector instead of cloning `children`.
+ /// This is the default (and conservative) return value.
+ ///
+ /// # Example
+ ///
+ /// If the function is `ABS(a)`, the current `interval` is `[4, 5]` and the
+ /// input `a` is given as `[-7, -6]`, then propagation would would return
+ /// `[-5, 5]`.
+ pub fn propagate_constraints(
+ &self,
+ interval: &Interval,
+ inputs: &[&Interval],
+ ) -> Result<Option<Vec<Interval>>> {
+ self.inner.propagate_constraints(interval, inputs)
+ }
+
+ /// Calculates the [`SortProperties`] of this function based on its
+ /// children's properties.
+ pub fn monotonicity(&self, inputs: &[ExprProperties]) ->
Result<SortProperties> {
+ self.inner.monotonicity(inputs)
+ }
+
/// See [`ScalarUDFImpl::coerce_types`] for more details.
pub fn coerce_types(&self, arg_types: &[DataType]) ->
Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
@@ -387,11 +433,6 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
&[]
}
- /// This function specifies monotonicity behaviors for User defined scalar
functions.
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(None)
- }
-
/// Optionally apply per-UDF simplification / rewrite rules.
///
/// This can be used to apply function specific simplification rules during
@@ -426,6 +467,59 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
false
}
+ /// Computes the output interval for a [`ScalarUDFImpl`], given the input
+ /// intervals.
+ ///
+ /// # Parameters
+ ///
+ /// * `children` are the intervals for the children (inputs) of this
function.
+ ///
+ /// # Example
+ ///
+ /// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`,
+ /// then the output interval would be `[0, 3]`.
+ fn evaluate_bounds(&self, _input: &[&Interval]) -> Result<Interval> {
+ // We cannot assume the input datatype is the same of output type.
+ Interval::make_unbounded(&DataType::Null)
+ }
+
+ /// Updates bounds for child expressions, given a known interval for this
+ /// function. This is used to propagate constraints down through an
expression
+ /// tree.
+ ///
+ /// # Parameters
+ ///
+ /// * `interval` is the currently known interval for this function.
+ /// * `inputs` are the current intervals for the inputs (children) of this
function.
+ ///
+ /// # Returns
+ ///
+ /// A `Vec` of new intervals for the children, in order.
+ ///
+ /// If constraint propagation reveals an infeasibility for any child,
returns
+ /// [`None`]. If none of the children intervals change as a result of
+ /// propagation, may return an empty vector instead of cloning `children`.
+ /// This is the default (and conservative) return value.
+ ///
+ /// # Example
+ ///
+ /// If the function is `ABS(a)`, the current `interval` is `[4, 5]` and the
+ /// input `a` is given as `[-7, -6]`, then propagation would would return
+ /// `[-5, 5]`.
+ fn propagate_constraints(
+ &self,
+ _interval: &Interval,
+ _inputs: &[&Interval],
+ ) -> Result<Option<Vec<Interval>>> {
+ Ok(Some(vec![]))
+ }
+
+ /// Calculates the [`SortProperties`] of this function based on its
+ /// children's properties.
+ fn monotonicity(&self, _inputs: &[ExprProperties]) ->
Result<SortProperties> {
+ Ok(SortProperties::Unordered)
+ }
+
/// Coerce arguments of a function call to types that the function can
evaluate.
///
/// This function is only called if [`ScalarUDFImpl::signature`] returns
[`crate::TypeSignature::UserDefined`]. Most
diff --git a/datafusion/functions/src/datetime/date_bin.rs
b/datafusion/functions/src/datetime/date_bin.rs
index da1797cdae..51f5c09a06 100644
--- a/datafusion/functions/src/datetime/date_bin.rs
+++ b/datafusion/functions/src/datetime/date_bin.rs
@@ -29,16 +29,17 @@ use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
use arrow::datatypes::{DataType, TimeUnit};
-use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
use datafusion_common::cast::as_primitive_array;
use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::TypeSignature::Exact;
use datafusion_expr::{
- ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility,
- TIMEZONE_WILDCARD,
+ ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
};
+use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
+
#[derive(Debug)]
pub struct DateBinFunc {
signature: Signature,
@@ -146,8 +147,21 @@ impl ScalarUDFImpl for DateBinFunc {
}
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(Some(vec![None, Some(true)]))
+ fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties>
{
+ // The DATE_BIN function preserves the order of its second argument.
+ let step = &input[0];
+ let date_value = &input[1];
+ let reference = input.get(2);
+
+ if step.sort_properties.eq(&SortProperties::Singleton)
+ && reference
+ .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
+ .unwrap_or(true)
+ {
+ Ok(date_value.sort_properties)
+ } else {
+ Ok(SortProperties::Unordered)
+ }
}
}
@@ -425,16 +439,16 @@ fn date_bin_impl(
mod tests {
use std::sync::Arc;
+ use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
use arrow::array::types::TimestampNanosecondType;
use arrow::array::{IntervalDayTimeArray, TimestampNanosecondArray};
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::{DataType, TimeUnit};
- use chrono::TimeDelta;
use datafusion_common::ScalarValue;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
- use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
+ use chrono::TimeDelta;
#[test]
fn test_date_bin() {
diff --git a/datafusion/functions/src/datetime/date_trunc.rs
b/datafusion/functions/src/datetime/date_trunc.rs
index 0414bf9c2a..ba5db567a0 100644
--- a/datafusion/functions/src/datetime/date_trunc.rs
+++ b/datafusion/functions/src/datetime/date_trunc.rs
@@ -29,19 +29,18 @@ use arrow::array::types::{
TimestampNanosecondType, TimestampSecondType,
};
use arrow::array::{Array, PrimitiveArray};
-use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
-use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
-use arrow::datatypes::{DataType, TimeUnit};
-use chrono::{
- DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset,
TimeDelta, Timelike,
-};
-
+use arrow::datatypes::DataType::{self, Null, Timestamp, Utf8};
+use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond,
Second};
use datafusion_common::cast::as_primitive_array;
use datafusion_common::{exec_err, plan_err, DataFusionError, Result,
ScalarValue};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::TypeSignature::Exact;
use datafusion_expr::{
- ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility,
- TIMEZONE_WILDCARD,
+ ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
+};
+
+use chrono::{
+ DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset,
TimeDelta, Timelike,
};
#[derive(Debug)]
@@ -205,8 +204,16 @@ impl ScalarUDFImpl for DateTruncFunc {
&self.aliases
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(Some(vec![None, Some(true)]))
+ fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties>
{
+ // The DATE_TRUNC function preserves the order of its second argument.
+ let precision = &input[0];
+ let date_value = &input[1];
+
+ if precision.sort_properties.eq(&SortProperties::Singleton) {
+ Ok(date_value.sort_properties)
+ } else {
+ Ok(SortProperties::Unordered)
+ }
}
}
@@ -410,7 +417,10 @@ fn parse_tz(tz: &Option<Arc<str>>) -> Result<Option<Tz>> {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use crate::datetime::date_trunc::{date_trunc_coarse, DateTruncFunc};
+
use arrow::array::cast::as_primitive_array;
use arrow::array::types::TimestampNanosecondType;
use arrow::array::TimestampNanosecondArray;
@@ -418,7 +428,6 @@ mod tests {
use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::ScalarValue;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
- use std::sync::Arc;
#[test]
fn date_trunc_test() {
diff --git a/datafusion/functions/src/macros.rs
b/datafusion/functions/src/macros.rs
index 5ee47bd3e8..2f14e881d1 100644
--- a/datafusion/functions/src/macros.rs
+++ b/datafusion/functions/src/macros.rs
@@ -89,7 +89,6 @@ macro_rules! make_udf_function {
/// The rationale for providing stub functions is to help users to configure
datafusion
/// properly (so they get an error telling them why a function is not
available)
/// instead of getting a cryptic "no function found" message at runtime.
-
macro_rules! make_stub_package {
($name:ident, $feature:literal) => {
#[cfg(not(feature = $feature))]
@@ -115,7 +114,6 @@ macro_rules! make_stub_package {
/// $ARGS_TYPE: the type of array to cast the argument to
/// $RETURN_TYPE: the type of array to return
/// $FUNC: the function to apply to each element of $ARG
-///
macro_rules! make_function_scalar_inputs_return_type {
($ARG: expr, $NAME:expr, $ARG_TYPE:ident, $RETURN_TYPE:ident, $FUNC:
block) => {{
let arg = downcast_arg!($ARG, $NAME, $ARG_TYPE);
@@ -162,14 +160,14 @@ macro_rules! make_math_unary_udf {
make_udf_function!($NAME::$UDF, $GNAME, $NAME);
mod $NAME {
+ use std::any::Any;
+ use std::sync::Arc;
+
use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
- use datafusion_expr::{
- ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature,
Volatility,
- };
- use std::any::Any;
- use std::sync::Arc;
+ use datafusion_expr::sort_properties::{ExprProperties,
SortProperties};
+ use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature,
Volatility};
#[derive(Debug)]
pub struct $UDF {
@@ -211,8 +209,11 @@ macro_rules! make_math_unary_udf {
}
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok($MONOTONICITY)
+ fn monotonicity(
+ &self,
+ input: &[ExprProperties],
+ ) -> Result<SortProperties> {
+ $MONOTONICITY(input)
}
fn invoke(&self, args: &[ColumnarValue]) ->
Result<ColumnarValue> {
@@ -266,15 +267,15 @@ macro_rules! make_math_binary_udf {
make_udf_function!($NAME::$UDF, $GNAME, $NAME);
mod $NAME {
+ use std::any::Any;
+ use std::sync::Arc;
+
use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
+ use datafusion_expr::sort_properties::{ExprProperties,
SortProperties};
use datafusion_expr::TypeSignature::*;
- use datafusion_expr::{
- ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature,
Volatility,
- };
- use std::any::Any;
- use std::sync::Arc;
+ use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature,
Volatility};
#[derive(Debug)]
pub struct $UDF {
@@ -318,8 +319,11 @@ macro_rules! make_math_binary_udf {
}
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok($MONOTONICITY)
+ fn monotonicity(
+ &self,
+ input: &[ExprProperties],
+ ) -> Result<SortProperties> {
+ $MONOTONICITY(input)
}
fn invoke(&self, args: &[ColumnarValue]) ->
Result<ColumnarValue> {
diff --git a/datafusion/functions/src/math/abs.rs
b/datafusion/functions/src/math/abs.rs
index e05dc86652..a752102913 100644
--- a/datafusion/functions/src/math/abs.rs
+++ b/datafusion/functions/src/math/abs.rs
@@ -17,23 +17,20 @@
//! math expressions
-use arrow::array::Decimal128Array;
-use arrow::array::Decimal256Array;
-use arrow::array::Int16Array;
-use arrow::array::Int32Array;
-use arrow::array::Int64Array;
-use arrow::array::Int8Array;
-use arrow::datatypes::DataType;
-use datafusion_common::{exec_err, not_impl_err};
-use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::ColumnarValue;
-
-use arrow::array::{ArrayRef, Float32Array, Float64Array};
-use arrow::error::ArrowError;
-use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::sync::Arc;
+use arrow::array::{
+ ArrayRef, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
Int16Array,
+ Int32Array, Int64Array, Int8Array,
+};
+use arrow::datatypes::DataType;
+use arrow::error::ArrowError;
+use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+
type MathArrayFunction = fn(&Vec<ArrayRef>) -> Result<ArrayRef>;
macro_rules! make_abs_function {
@@ -170,7 +167,21 @@ impl ScalarUDFImpl for AbsFunc {
let input_data_type = args[0].data_type();
let abs_fun = create_abs_function(input_data_type)?;
- let arr = abs_fun(&args)?;
- Ok(ColumnarValue::Array(arr))
+ abs_fun(&args).map(ColumnarValue::Array)
+ }
+
+ fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties>
{
+ // Non-decreasing for x ≥ 0 and symmetrically non-increasing for x ≤ 0.
+ let arg = &input[0];
+ let range = &arg.range;
+ let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+ if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+ Ok(arg.sort_properties)
+ } else if range.lt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+ Ok(-arg.sort_properties)
+ } else {
+ Ok(SortProperties::Unordered)
+ }
}
}
diff --git a/datafusion/functions/src/math/log.rs
b/datafusion/functions/src/math/log.rs
index e6c698ad1a..8c1e8ac8fe 100644
--- a/datafusion/functions/src/math/log.rs
+++ b/datafusion/functions/src/math/log.rs
@@ -17,6 +17,12 @@
//! Math function: `log()`.
+use std::any::Any;
+use std::sync::Arc;
+
+use super::power::PowerFunc;
+
+use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use datafusion_common::{
exec_err, internal_err, plan_datafusion_err, plan_err, DataFusionError,
Result,
@@ -24,15 +30,9 @@ use datafusion_common::{
};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
-use datafusion_expr::{lit, ColumnarValue, Expr, FuncMonotonicity, ScalarUDF};
-
-use arrow::array::{ArrayRef, Float32Array, Float64Array};
-use datafusion_expr::TypeSignature::*;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_expr::{lit, ColumnarValue, Expr, ScalarUDF, TypeSignature::*};
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
-use std::any::Any;
-use std::sync::Arc;
-
-use super::power::PowerFunc;
#[derive(Debug)]
pub struct LogFunc {
@@ -81,8 +81,23 @@ impl ScalarUDFImpl for LogFunc {
}
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(Some(vec![Some(true), Some(false)]))
+ fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties>
{
+ match (input[0].sort_properties, input[1].sort_properties) {
+ (first @ SortProperties::Ordered(value),
SortProperties::Ordered(base))
+ if !value.descending && base.descending
+ || value.descending && !base.descending =>
+ {
+ Ok(first)
+ }
+ (
+ first @ (SortProperties::Ordered(_) |
SortProperties::Singleton),
+ SortProperties::Singleton,
+ ) => Ok(first),
+ (SortProperties::Singleton, second @ SortProperties::Ordered(_))
=> {
+ Ok(-second)
+ }
+ _ => Ok(SortProperties::Unordered),
+ }
}
// Support overloaded log(base, x) and log(x) which defaults to log(10, x)
@@ -213,14 +228,13 @@ fn is_pow(func: &ScalarUDF) -> bool {
mod tests {
use std::collections::HashMap;
- use datafusion_common::{
- cast::{as_float32_array, as_float64_array},
- DFSchema,
- };
- use datafusion_expr::{execution_props::ExecutionProps,
simplify::SimplifyContext};
-
use super::*;
+ use datafusion_common::cast::{as_float32_array, as_float64_array};
+ use datafusion_common::DFSchema;
+ use datafusion_expr::execution_props::ExecutionProps;
+ use datafusion_expr::simplify::SimplifyContext;
+
#[test]
fn test_log_f64() {
let args = [
diff --git a/datafusion/functions/src/math/mod.rs
b/datafusion/functions/src/math/mod.rs
index b6e8d26b64..6c26ce79d0 100644
--- a/datafusion/functions/src/math/mod.rs
+++ b/datafusion/functions/src/math/mod.rs
@@ -17,9 +17,12 @@
//! "math" DataFusion functions
-use datafusion_expr::ScalarUDF;
use std::sync::Arc;
+use crate::math::monotonicity::*;
+
+use datafusion_expr::ScalarUDF;
+
pub mod abs;
pub mod cot;
pub mod factorial;
@@ -27,6 +30,7 @@ pub mod gcd;
pub mod iszero;
pub mod lcm;
pub mod log;
+pub mod monotonicity;
pub mod nans;
pub mod nanvl;
pub mod pi;
@@ -37,42 +41,60 @@ pub mod trunc;
// Create UDFs
make_udf_function!(abs::AbsFunc, ABS, abs);
-make_math_unary_udf!(AcosFunc, ACOS, acos, acos, None);
-make_math_unary_udf!(AcoshFunc, ACOSH, acosh, acosh, Some(vec![Some(true)]));
-make_math_unary_udf!(AsinFunc, ASIN, asin, asin, None);
-make_math_unary_udf!(AsinhFunc, ASINH, asinh, asinh, Some(vec![Some(true)]));
-make_math_unary_udf!(AtanFunc, ATAN, atan, atan, Some(vec![Some(true)]));
-make_math_unary_udf!(AtanhFunc, ATANH, atanh, atanh, Some(vec![Some(true)]));
-make_math_binary_udf!(Atan2, ATAN2, atan2, atan2, Some(vec![Some(true)]));
-make_math_unary_udf!(CbrtFunc, CBRT, cbrt, cbrt, None);
-make_math_unary_udf!(CeilFunc, CEIL, ceil, ceil, Some(vec![Some(true)]));
-make_math_unary_udf!(CosFunc, COS, cos, cos, None);
-make_math_unary_udf!(CoshFunc, COSH, cosh, cosh, None);
+make_math_unary_udf!(AcosFunc, ACOS, acos, acos, super::acos_monotonicity);
+make_math_unary_udf!(AcoshFunc, ACOSH, acosh, acosh,
super::acosh_monotonicity);
+make_math_unary_udf!(AsinFunc, ASIN, asin, asin, super::asin_monotonicity);
+make_math_unary_udf!(AsinhFunc, ASINH, asinh, asinh,
super::asinh_monotonicity);
+make_math_unary_udf!(AtanFunc, ATAN, atan, atan, super::atan_monotonicity);
+make_math_unary_udf!(AtanhFunc, ATANH, atanh, atanh,
super::atanh_monotonicity);
+make_math_binary_udf!(Atan2, ATAN2, atan2, atan2, super::atan2_monotonicity);
+make_math_unary_udf!(CbrtFunc, CBRT, cbrt, cbrt, super::cbrt_monotonicity);
+make_math_unary_udf!(CeilFunc, CEIL, ceil, ceil, super::ceil_monotonicity);
+make_math_unary_udf!(CosFunc, COS, cos, cos, super::cos_monotonicity);
+make_math_unary_udf!(CoshFunc, COSH, cosh, cosh, super::cosh_monotonicity);
make_udf_function!(cot::CotFunc, COT, cot);
-make_math_unary_udf!(DegreesFunc, DEGREES, degrees, to_degrees, None);
-make_math_unary_udf!(ExpFunc, EXP, exp, exp, Some(vec![Some(true)]));
+make_math_unary_udf!(
+ DegreesFunc,
+ DEGREES,
+ degrees,
+ to_degrees,
+ super::degrees_monotonicity
+);
+make_math_unary_udf!(ExpFunc, EXP, exp, exp, super::exp_monotonicity);
make_udf_function!(factorial::FactorialFunc, FACTORIAL, factorial);
-make_math_unary_udf!(FloorFunc, FLOOR, floor, floor, Some(vec![Some(true)]));
+make_math_unary_udf!(FloorFunc, FLOOR, floor, floor,
super::floor_monotonicity);
make_udf_function!(log::LogFunc, LOG, log);
make_udf_function!(gcd::GcdFunc, GCD, gcd);
make_udf_function!(nans::IsNanFunc, ISNAN, isnan);
make_udf_function!(iszero::IsZeroFunc, ISZERO, iszero);
make_udf_function!(lcm::LcmFunc, LCM, lcm);
-make_math_unary_udf!(LnFunc, LN, ln, ln, Some(vec![Some(true)]));
-make_math_unary_udf!(Log2Func, LOG2, log2, log2, Some(vec![Some(true)]));
-make_math_unary_udf!(Log10Func, LOG10, log10, log10, Some(vec![Some(true)]));
+make_math_unary_udf!(LnFunc, LN, ln, ln, super::ln_monotonicity);
+make_math_unary_udf!(Log2Func, LOG2, log2, log2, super::log2_monotonicity);
+make_math_unary_udf!(Log10Func, LOG10, log10, log10,
super::log10_monotonicity);
make_udf_function!(nanvl::NanvlFunc, NANVL, nanvl);
make_udf_function!(pi::PiFunc, PI, pi);
make_udf_function!(power::PowerFunc, POWER, power);
-make_math_unary_udf!(RadiansFunc, RADIANS, radians, to_radians, None);
+make_math_unary_udf!(
+ RadiansFunc,
+ RADIANS,
+ radians,
+ to_radians,
+ super::radians_monotonicity
+);
make_udf_function!(random::RandomFunc, RANDOM, random);
make_udf_function!(round::RoundFunc, ROUND, round);
-make_math_unary_udf!(SignumFunc, SIGNUM, signum, signum, None);
-make_math_unary_udf!(SinFunc, SIN, sin, sin, None);
-make_math_unary_udf!(SinhFunc, SINH, sinh, sinh, None);
-make_math_unary_udf!(SqrtFunc, SQRT, sqrt, sqrt, None);
-make_math_unary_udf!(TanFunc, TAN, tan, tan, None);
-make_math_unary_udf!(TanhFunc, TANH, tanh, tanh, None);
+make_math_unary_udf!(
+ SignumFunc,
+ SIGNUM,
+ signum,
+ signum,
+ super::signum_monotonicity
+);
+make_math_unary_udf!(SinFunc, SIN, sin, sin, super::sin_monotonicity);
+make_math_unary_udf!(SinhFunc, SINH, sinh, sinh, super::sinh_monotonicity);
+make_math_unary_udf!(SqrtFunc, SQRT, sqrt, sqrt, super::sqrt_monotonicity);
+make_math_unary_udf!(TanFunc, TAN, tan, tan, super::tan_monotonicity);
+make_math_unary_udf!(TanhFunc, TANH, tanh, tanh, super::tanh_monotonicity);
make_udf_function!(trunc::TruncFunc, TRUNC, trunc);
pub mod expr_fn {
diff --git a/datafusion/functions/src/math/monotonicity.rs
b/datafusion/functions/src/math/monotonicity.rs
new file mode 100644
index 0000000000..5ce5654ae7
--- /dev/null
+++ b/datafusion/functions/src/math/monotonicity.rs
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::DataType;
+use datafusion_common::{exec_err, Result, ScalarValue};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+
+fn symmetric_unit_interval(data_type: &DataType) -> Result<Interval> {
+ Interval::try_new(
+ ScalarValue::new_negative_one(data_type)?,
+ ScalarValue::new_one(data_type)?,
+ )
+}
+
+/// Non-increasing on the interval \[−1, 1\], undefined otherwise.
+pub fn acos_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ let arg = &input[0];
+ let range = &arg.range;
+
+ let valid_domain = symmetric_unit_interval(&range.lower().data_type())?;
+
+ if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE {
+ Ok(-arg.sort_properties)
+ } else {
+ exec_err!("Input range of ACOS contains out-of-domain values")
+ }
+}
+
+/// Non-decreasing for x ≥ 1, undefined otherwise.
+pub fn acosh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ let arg = &input[0];
+ let range = &arg.range;
+
+ let valid_domain = Interval::try_new(
+ ScalarValue::new_one(&range.lower().data_type())?,
+ ScalarValue::try_from(&range.upper().data_type())?,
+ )?;
+
+ if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE {
+ Ok(arg.sort_properties)
+ } else {
+ exec_err!("Input range of ACOSH contains out-of-domain values")
+ }
+}
+
+/// Non-decreasing on the interval \[−1, 1\], undefined otherwise.
+pub fn asin_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ let arg = &input[0];
+ let range = &arg.range;
+
+ let valid_domain = symmetric_unit_interval(&range.lower().data_type())?;
+
+ if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE {
+ Ok(arg.sort_properties)
+ } else {
+ exec_err!("Input range of ASIN contains out-of-domain values")
+ }
+}
+
+/// Non-decreasing for all real numbers.
+pub fn asinh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn atan_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing on the interval \[−1, 1\], undefined otherwise.
+pub fn atanh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ let arg = &input[0];
+ let range = &arg.range;
+
+ let valid_domain = symmetric_unit_interval(&range.lower().data_type())?;
+
+ if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE {
+ Ok(arg.sort_properties)
+ } else {
+ exec_err!("Input range of ATANH contains out-of-domain values")
+ }
+}
+
+/// Monotonicity depends on the quadrant.
+// TODO: Implement monotonicity of the ATAN2 function.
+pub fn atan2_monotonicity(_input: &[ExprProperties]) -> Result<SortProperties>
{
+ Ok(SortProperties::Unordered)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn cbrt_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn ceil_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
+
+/// Non-increasing on \[0, π\] and then non-decreasing on \[π, 2π\].
+/// This pattern repeats periodically with a period of 2π.
+// TODO: Implement monotonicity of the ATAN2 function.
+pub fn cos_monotonicity(_input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(SortProperties::Unordered)
+}
+
+/// Non-decreasing for x ≥ 0 and symmetrically non-increasing for x ≤ 0.
+pub fn cosh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ let arg = &input[0];
+ let range = &arg.range;
+
+ let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+ if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+ Ok(arg.sort_properties)
+ } else if range.lt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+ Ok(-arg.sort_properties)
+ } else {
+ Ok(SortProperties::Unordered)
+ }
+}
+
+/// Non-decreasing function that converts radians to degrees.
+pub fn degrees_monotonicity(input: &[ExprProperties]) ->
Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn exp_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn floor_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for x ≥ 0, undefined otherwise.
+pub fn ln_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ let arg = &input[0];
+ let range = &arg.range;
+
+ let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+ if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+ Ok(arg.sort_properties)
+ } else {
+ exec_err!("Input range of LN contains out-of-domain values")
+ }
+}
+
+/// Non-decreasing for x ≥ 0, undefined otherwise.
+pub fn log2_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ let arg = &input[0];
+ let range = &arg.range;
+
+ let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+ if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+ Ok(arg.sort_properties)
+ } else {
+ exec_err!("Input range of LOG2 contains out-of-domain values")
+ }
+}
+
+/// Non-decreasing for x ≥ 0, undefined otherwise.
+pub fn log10_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ let arg = &input[0];
+ let range = &arg.range;
+
+ let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+ if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+ Ok(arg.sort_properties)
+ } else {
+ exec_err!("Input range of LOG10 contains out-of-domain values")
+ }
+}
+
+/// Non-decreasing for all real numbers x.
+pub fn radians_monotonicity(input: &[ExprProperties]) ->
Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers x.
+pub fn signum_monotonicity(input: &[ExprProperties]) -> Result<SortProperties>
{
+ Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing on \[0, π\] and then non-increasing on \[π, 2π\].
+/// This pattern repeats periodically with a period of 2π.
+// TODO: Implement monotonicity of the SIN function.
+pub fn sin_monotonicity(_input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(SortProperties::Unordered)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn sinh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for x ≥ 0, undefined otherwise.
+pub fn sqrt_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ let arg = &input[0];
+ let range = &arg.range;
+
+ let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+ if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+ Ok(arg.sort_properties)
+ } else {
+ exec_err!("Input range of SQRT contains out-of-domain values")
+ }
+}
+
+/// Non-decreasing between vertical asymptotes at x = k * π ± π / 2 for any
+/// integer k.
+// TODO: Implement monotonicity of the TAN function.
+pub fn tan_monotonicity(_input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(SortProperties::Unordered)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn tanh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+ Ok(input[0].sort_properties)
+}
diff --git a/datafusion/functions/src/math/pi.rs
b/datafusion/functions/src/math/pi.rs
index f9403e411f..60c94b6ca6 100644
--- a/datafusion/functions/src/math/pi.rs
+++ b/datafusion/functions/src/math/pi.rs
@@ -19,10 +19,9 @@ use std::any::Any;
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::Float64;
-
use datafusion_common::{not_impl_err, Result, ScalarValue};
-use datafusion_expr::{ColumnarValue, FuncMonotonicity, Volatility};
-use datafusion_expr::{ScalarUDFImpl, Signature};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
#[derive(Debug)]
pub struct PiFunc {
@@ -70,7 +69,8 @@ impl ScalarUDFImpl for PiFunc {
))))
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(Some(vec![Some(true)]))
+ fn monotonicity(&self, _input: &[ExprProperties]) ->
Result<SortProperties> {
+ // This function returns a constant value.
+ Ok(SortProperties::Singleton)
}
}
diff --git a/datafusion/functions/src/math/round.rs
b/datafusion/functions/src/math/round.rs
index f4a163137a..600f4fd547 100644
--- a/datafusion/functions/src/math/round.rs
+++ b/datafusion/functions/src/math/round.rs
@@ -18,15 +18,15 @@
use std::any::Any;
use std::sync::Arc;
+use crate::utils::make_scalar_function;
+
use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array};
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::{Float32, Float64};
-
-use crate::utils::make_scalar_function;
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::TypeSignature::Exact;
-use datafusion_expr::{ColumnarValue, FuncMonotonicity};
-use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
#[derive(Debug)]
pub struct RoundFunc {
@@ -80,8 +80,19 @@ impl ScalarUDFImpl for RoundFunc {
make_scalar_function(round, vec![])(args)
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(Some(vec![Some(true)]))
+ fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties>
{
+ // round preserves the order of the first argument
+ let value = &input[0];
+ let precision = input.get(1);
+
+ if precision
+ .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
+ .unwrap_or(true)
+ {
+ Ok(value.sort_properties)
+ } else {
+ Ok(SortProperties::Unordered)
+ }
}
}
@@ -179,10 +190,12 @@ pub fn round(args: &[ArrayRef]) -> Result<ArrayRef> {
#[cfg(test)]
mod test {
+ use std::sync::Arc;
+
use crate::math::round::round;
+
use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array};
use datafusion_common::cast::{as_float32_array, as_float64_array};
- use std::sync::Arc;
#[test]
fn test_round_f32() {
diff --git a/datafusion/functions/src/math/trunc.rs
b/datafusion/functions/src/math/trunc.rs
index 6f88099889..0c4d38564b 100644
--- a/datafusion/functions/src/math/trunc.rs
+++ b/datafusion/functions/src/math/trunc.rs
@@ -18,16 +18,16 @@
use std::any::Any;
use std::sync::Arc;
+use crate::utils::make_scalar_function;
+
use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array};
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::{Float32, Float64};
-
-use crate::utils::make_scalar_function;
use datafusion_common::ScalarValue::Int64;
use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::TypeSignature::Exact;
-use datafusion_expr::{ColumnarValue, FuncMonotonicity};
-use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
#[derive(Debug)]
pub struct TruncFunc {
@@ -86,8 +86,19 @@ impl ScalarUDFImpl for TruncFunc {
make_scalar_function(trunc, vec![])(args)
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(Some(vec![Some(true)]))
+ fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties>
{
+ // trunc preserves the order of the first argument
+ let value = &input[0];
+ let precision = input.get(1);
+
+ if precision
+ .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
+ .unwrap_or(true)
+ {
+ Ok(value.sort_properties)
+ } else {
+ Ok(SortProperties::Unordered)
+ }
}
}
@@ -156,10 +167,12 @@ fn compute_truncate64(x: f64, y: i64) -> f64 {
#[cfg(test)]
mod test {
+ use std::sync::Arc;
+
use crate::math::trunc::trunc;
+
use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array};
use datafusion_common::cast::{as_float32_array, as_float64_array};
- use std::sync::Arc;
#[test]
fn test_truncate_32() {
diff --git a/datafusion/physical-expr-common/src/lib.rs
b/datafusion/physical-expr-common/src/lib.rs
index 53e3134a1b..f335958698 100644
--- a/datafusion/physical-expr-common/src/lib.rs
+++ b/datafusion/physical-expr-common/src/lib.rs
@@ -19,6 +19,5 @@ pub mod aggregate;
pub mod expressions;
pub mod physical_expr;
pub mod sort_expr;
-pub mod sort_properties;
pub mod tree_node;
pub mod utils;
diff --git a/datafusion/physical-expr-common/src/physical_expr.rs
b/datafusion/physical-expr-common/src/physical_expr.rs
index a0f8bdf103..00b3dd725d 100644
--- a/datafusion/physical-expr-common/src/physical_expr.rs
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -20,17 +20,17 @@ use std::fmt::{Debug, Display};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
+use crate::utils::scatter;
+
use arrow::array::BooleanArray;
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, not_impl_err, Result};
use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::ColumnarValue;
-use crate::sort_properties::SortProperties;
-use crate::utils::scatter;
-
/// See
[create_physical_expr](https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html)
/// for examples of creating `PhysicalExpr` from `Expr`
pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
@@ -154,17 +154,13 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug +
PartialEq<dyn Any> {
/// directly because it must remain object safe.
fn dyn_hash(&self, _state: &mut dyn Hasher);
- /// The order information of a PhysicalExpr can be estimated from its
children.
- /// This is especially helpful for projection expressions. If we can
ensure that the
- /// order of a PhysicalExpr to project matches with the order of SortExec,
we can
- /// eliminate that SortExecs.
- ///
- /// By recursively calling this function, we can obtain the overall order
- /// information of the PhysicalExpr. Since `SortOptions` cannot fully
handle
- /// the propagation of unordered columns and literals, the `SortProperties`
- /// struct is used.
- fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
- SortProperties::Unordered
+ /// Calculates the properties of this [`PhysicalExpr`] based on its
+ /// children's properties (i.e. order and range), recursively aggregating
+ /// the information from its children. In cases where the [`PhysicalExpr`]
+ /// has no children (e.g., `Literal` or `Column`), these properties should
+ /// be specified externally, as the function defaults to unknown
properties.
+ fn get_properties(&self, _children: &[ExprProperties]) ->
Result<ExprProperties> {
+ Ok(ExprProperties::new_unknown())
}
}
diff --git a/datafusion/physical-expr-common/src/utils.rs
b/datafusion/physical-expr-common/src/utils.rs
index 459b5a4849..601d344e4a 100644
--- a/datafusion/physical-expr-common/src/utils.rs
+++ b/datafusion/physical-expr-common/src/utils.rs
@@ -15,13 +15,34 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::{
- array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData},
- compute::{and_kleene, is_not_null, SlicesIterator},
+use std::sync::Arc;
+
+use crate::{
+ physical_expr::PhysicalExpr, sort_expr::PhysicalSortExpr,
tree_node::ExprContext,
};
-use datafusion_common::Result;
-use crate::sort_expr::PhysicalSortExpr;
+use arrow::array::{make_array, Array, ArrayRef, BooleanArray,
MutableArrayData};
+use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
+use datafusion_common::Result;
+use datafusion_expr::sort_properties::ExprProperties;
+
+/// Represents a [`PhysicalExpr`] node with associated properties (order and
+/// range) in a context where properties are tracked.
+pub type ExprPropertiesNode = ExprContext<ExprProperties>;
+
+impl ExprPropertiesNode {
+ /// Constructs a new `ExprPropertiesNode` with unknown properties for a
+ /// given physical expression. This node initializes with default
properties
+ /// and recursively applies this to all child expressions.
+ pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
+ let children =
expr.children().into_iter().map(Self::new_unknown).collect();
+ Self {
+ expr,
+ data: ExprProperties::new_unknown(),
+ children,
+ }
+ }
+}
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs
b/datafusion/physical-expr/src/equivalence/mod.rs
index 3ce641c5aa..7faf2caae0 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -70,7 +70,6 @@ pub fn add_offset_to_expr(
#[cfg(test)]
mod tests {
-
use super::*;
use crate::expressions::col;
use crate::PhysicalSortExpr;
@@ -147,7 +146,7 @@ mod tests {
let col_f = &col("f", &test_schema)?;
let col_g = &col("g", &test_schema)?;
let mut eq_properties =
EquivalenceProperties::new(test_schema.clone());
- eq_properties.add_equal_conditions(col_a, col_c);
+ eq_properties.add_equal_conditions(col_a, col_c)?;
let option_asc = SortOptions {
descending: false,
@@ -204,7 +203,7 @@ mod tests {
let mut eq_properties =
EquivalenceProperties::new(test_schema.clone());
// Define a and f are aliases
- eq_properties.add_equal_conditions(col_a, col_f);
+ eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([col_e.clone()]);
@@ -338,11 +337,11 @@ mod tests {
let col_y_expr = Arc::new(Column::new("y", 4)) as Arc<dyn
PhysicalExpr>;
// a and b are aliases
- eq_properties.add_equal_conditions(&col_a_expr, &col_b_expr);
+ eq_properties.add_equal_conditions(&col_a_expr, &col_b_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
// This new entry is redundant, size shouldn't increase
- eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr);
+ eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
assert_eq!(eq_groups.len(), 2);
@@ -351,7 +350,7 @@ mod tests {
// b and c are aliases. Exising equivalence class should expand,
// however there shouldn't be any new equivalence class
- eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr);
+ eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
assert_eq!(eq_groups.len(), 3);
@@ -360,12 +359,12 @@ mod tests {
assert!(eq_groups.contains(&col_c_expr));
// This is a new set of equality. Hence equivalent class count should
be 2.
- eq_properties.add_equal_conditions(&col_x_expr, &col_y_expr);
+ eq_properties.add_equal_conditions(&col_x_expr, &col_y_expr)?;
assert_eq!(eq_properties.eq_group().len(), 2);
// This equality bridges distinct equality sets.
// Hence equivalent class count should decrease from 2 to 1.
- eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr);
+ eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr)?;
assert_eq!(eq_properties.eq_group().len(), 1);
let eq_groups = &eq_properties.eq_group().classes[0];
assert_eq!(eq_groups.len(), 5);
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs
b/datafusion/physical-expr/src/equivalence/ordering.rs
index ed4600f2d9..7857d9df72 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -223,26 +223,26 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx:
usize, pre_idx: usize) ->
mod tests {
use std::sync::Arc;
- use arrow::datatypes::{DataType, Field, Schema};
- use arrow_schema::SortOptions;
- use itertools::Itertools;
-
- use datafusion_common::{DFSchema, Result};
- use datafusion_expr::{Operator, ScalarUDF};
-
use crate::equivalence::tests::{
convert_to_orderings, convert_to_sort_exprs, create_random_schema,
- create_test_params, generate_table_for_eq_properties,
is_table_same_after_sort,
+ create_test_params, create_test_schema,
generate_table_for_eq_properties,
+ is_table_same_after_sort,
};
- use crate::equivalence::{tests::create_test_schema, EquivalenceProperties};
use crate::equivalence::{
- EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass,
+ EquivalenceClass, EquivalenceGroup, EquivalenceProperties,
+ OrderingEquivalenceClass,
};
- use crate::expressions::Column;
- use crate::expressions::{col, BinaryExpr};
+ use crate::expressions::{col, BinaryExpr, Column};
use crate::utils::tests::TestScalarUDF;
use crate::{PhysicalExpr, PhysicalSortExpr};
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow_schema::SortOptions;
+ use datafusion_common::{DFSchema, Result};
+ use datafusion_expr::{Operator, ScalarUDF};
+
+ use itertools::Itertools;
+
#[test]
fn test_ordering_satisfy() -> Result<()> {
let input_schema = Arc::new(Schema::new(vec![
@@ -883,7 +883,7 @@ mod tests {
};
// a=c (e.g they are aliases).
let mut eq_properties = EquivalenceProperties::new(test_schema);
- eq_properties.add_equal_conditions(col_a, col_c);
+ eq_properties.add_equal_conditions(col_a, col_c)?;
let orderings = vec![
vec![(col_a, options)],
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs
b/datafusion/physical-expr/src/equivalence/projection.rs
index 260610f23d..b5ac149d8b 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -17,14 +17,13 @@
use std::sync::Arc;
-use arrow::datatypes::SchemaRef;
+use crate::expressions::Column;
+use crate::PhysicalExpr;
+use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, Result};
-use crate::expressions::Column;
-use crate::PhysicalExpr;
-
/// Stores the mapping between source expressions and target expressions for a
/// projection.
#[derive(Debug, Clone)]
@@ -114,14 +113,7 @@ impl ProjectionMapping {
#[cfg(test)]
mod tests {
-
- use arrow::datatypes::{DataType, Field, Schema};
- use arrow_schema::{SortOptions, TimeUnit};
- use itertools::Itertools;
-
- use datafusion_common::DFSchema;
- use datafusion_expr::{Operator, ScalarUDF};
-
+ use super::*;
use crate::equivalence::tests::{
apply_projection, convert_to_orderings, convert_to_orderings_owned,
create_random_schema, generate_table_for_eq_properties,
is_table_same_after_sort,
@@ -133,7 +125,12 @@ mod tests {
use crate::utils::tests::TestScalarUDF;
use crate::PhysicalSortExpr;
- use super::*;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow_schema::{SortOptions, TimeUnit};
+ use datafusion_common::DFSchema;
+ use datafusion_expr::{Operator, ScalarUDF};
+
+ use itertools::Itertools;
#[test]
fn project_orderings() -> Result<()> {
@@ -941,7 +938,7 @@ mod tests {
for (orderings, equal_columns, expected) in test_cases {
let mut eq_properties = EquivalenceProperties::new(schema.clone());
for (lhs, rhs) in equal_columns {
- eq_properties.add_equal_conditions(lhs, rhs);
+ eq_properties.add_equal_conditions(lhs, rhs)?;
}
let orderings = convert_to_orderings(&orderings);
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index c654208208..016c4c4ae1 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -18,25 +18,27 @@
use std::hash::{Hash, Hasher};
use std::sync::Arc;
-use arrow_schema::{SchemaRef, SortOptions};
-use indexmap::{IndexMap, IndexSet};
-use itertools::Itertools;
-
-use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{JoinSide, JoinType, Result};
-
+use super::ordering::collapse_lex_ordering;
use crate::equivalence::{
collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass,
ProjectionMapping,
};
use crate::expressions::{CastExpr, Literal};
-use crate::sort_properties::{ExprOrdering, SortProperties};
use crate::{
physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement,
LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
PhysicalSortRequirement,
};
-use super::ordering::collapse_lex_ordering;
+use arrow_schema::{SchemaRef, SortOptions};
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{JoinSide, JoinType, Result};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_physical_expr_common::expressions::column::Column;
+use datafusion_physical_expr_common::utils::ExprPropertiesNode;
+
+use indexmap::{IndexMap, IndexSet};
+use itertools::Itertools;
/// A `EquivalenceProperties` object stores useful information related to a
schema.
/// Currently, it keeps track of:
@@ -197,7 +199,7 @@ impl EquivalenceProperties {
&mut self,
left: &Arc<dyn PhysicalExpr>,
right: &Arc<dyn PhysicalExpr>,
- ) {
+ ) -> Result<()> {
// Discover new constants in light of new the equality:
if self.is_expr_constant(left) {
// Left expression is constant, add right as constant
@@ -216,27 +218,34 @@ impl EquivalenceProperties {
let mut new_orderings = vec![];
for ordering in self.normalized_oeq_class().iter() {
let expressions = if left.eq(&ordering[0].expr) {
- // left expression is leading ordering
+ // Left expression is leading ordering
Some((ordering[0].options, right))
} else if right.eq(&ordering[0].expr) {
- // right expression is leading ordering
+ // Right expression is leading ordering
Some((ordering[0].options, left))
} else {
None
};
if let Some((leading_ordering, other_expr)) = expressions {
- // Only handle expressions with exactly one child
- // TODO: it should be possible to handle expressions orderings
f(a, b, c), a, b, c
- // if f is monotonic in all arguments
- // First Expression after leading ordering
+ // Currently, we only handle expressions with a single child.
+ // TODO: It should be possible to handle expressions orderings
like
+ // f(a, b, c), a, b, c if f is monotonic in all
arguments.
+ // First expression after leading ordering
if let Some(next_expr) = ordering.get(1) {
let children = other_expr.children();
if children.len() == 1
&& children[0].eq(&next_expr.expr)
&& SortProperties::Ordered(leading_ordering)
- ==
other_expr.get_ordering(&[SortProperties::Ordered(
- next_expr.options,
- )])
+ == other_expr
+ .get_properties(&[ExprProperties {
+ sort_properties: SortProperties::Ordered(
+ leading_ordering,
+ ),
+ range: Interval::make_unbounded(
+ &other_expr.data_type(&self.schema)?,
+ )?,
+ }])?
+ .sort_properties
{
// Assume existing ordering is [a ASC, b ASC]
// When equality a = f(b) is given, If we know that
given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid,
@@ -254,6 +263,7 @@ impl EquivalenceProperties {
// Add equal expressions to the state
self.eq_group.add_equal_conditions(left, right);
+ Ok(())
}
/// Track/register physical expressions with constant values.
@@ -378,11 +388,15 @@ impl EquivalenceProperties {
///
/// Returns `true` if the specified ordering is satisfied, `false`
otherwise.
fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
- let expr_ordering = self.get_expr_ordering(req.expr.clone());
- let ExprOrdering { expr, data, .. } = expr_ordering;
- match data {
+ let ExprProperties {
+ sort_properties, ..
+ } = self.get_expr_properties(req.expr.clone());
+ match sort_properties {
SortProperties::Ordered(options) => {
- let sort_expr = PhysicalSortExpr { expr, options };
+ let sort_expr = PhysicalSortExpr {
+ expr: req.expr.clone(),
+ options,
+ };
sort_expr.satisfy(req, self.schema())
}
// Singleton expressions satisfies any ordering.
@@ -698,8 +712,9 @@ impl EquivalenceProperties {
referred_dependencies(&dependency_map, source)
.into_iter()
.filter_map(|relevant_deps| {
- if let SortProperties::Ordered(options) =
- get_expr_ordering(source, &relevant_deps)
+ if let Ok(SortProperties::Ordered(options)) =
+ get_expr_properties(source, &relevant_deps,
&self.schema)
+ .map(|prop| prop.sort_properties)
{
Some((options, relevant_deps))
} else {
@@ -837,16 +852,27 @@ impl EquivalenceProperties {
let ordered_exprs = search_indices
.iter()
.flat_map(|&idx| {
- let ExprOrdering { expr, data, .. } =
- eq_properties.get_expr_ordering(exprs[idx].clone());
- match data {
- SortProperties::Ordered(options) => {
- Some((PhysicalSortExpr { expr, options }, idx))
- }
+ let ExprProperties {
+ sort_properties, ..
+ } = eq_properties.get_expr_properties(exprs[idx].clone());
+ match sort_properties {
+ SortProperties::Ordered(options) => Some((
+ PhysicalSortExpr {
+ expr: exprs[idx].clone(),
+ options,
+ },
+ idx,
+ )),
SortProperties::Singleton => {
// Assign default ordering to constant expressions
let options = SortOptions::default();
- Some((PhysicalSortExpr { expr, options }, idx))
+ Some((
+ PhysicalSortExpr {
+ expr: exprs[idx].clone(),
+ options,
+ },
+ idx,
+ ))
}
SortProperties::Unordered => None,
}
@@ -895,32 +921,33 @@ impl EquivalenceProperties {
is_constant_recurse(&normalized_constants, &normalized_expr)
}
- /// Retrieves the ordering information for a given physical expression.
+ /// Retrieves the properties for a given physical expression.
///
- /// This function constructs an `ExprOrdering` object for the provided
+ /// This function constructs an [`ExprProperties`] object for the given
/// expression, which encapsulates information about the expression's
- /// ordering, including its [`SortProperties`].
+ /// properties, including its [`SortProperties`] and [`Interval`].
///
- /// # Arguments
+ /// # Parameters
///
/// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical
expression
/// for which ordering information is sought.
///
/// # Returns
///
- /// Returns an `ExprOrdering` object containing the ordering information
for
- /// the given expression.
- pub fn get_expr_ordering(&self, expr: Arc<dyn PhysicalExpr>) ->
ExprOrdering {
- ExprOrdering::new_default(expr.clone())
- .transform_up(|expr| Ok(update_ordering(expr, self)))
+ /// Returns an [`ExprProperties`] object containing the ordering and range
+ /// information for the given expression.
+ pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) ->
ExprProperties {
+ ExprPropertiesNode::new_unknown(expr)
+ .transform_up(|expr| update_properties(expr, self))
.data()
- // Guaranteed to always return `Ok`.
- .unwrap()
+ .map(|node| node.data)
+ .unwrap_or(ExprProperties::new_unknown())
}
}
-/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node.
-/// The node can either be a leaf node, or an intermediate node:
+/// Calculates the properties of a given [`ExprPropertiesNode`].
+///
+/// Order information can be retrieved as:
/// - If it is a leaf node, we directly find the order of the node by looking
/// at the given sort expression and equivalence properties if it is a `Column`
/// leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
@@ -931,30 +958,41 @@ impl EquivalenceProperties {
/// node directly matches with the sort expression. If there is a match, the
/// sort expression emerges at that node immediately, discarding the recursive
/// result coming from its children.
-fn update_ordering(
- mut node: ExprOrdering,
+///
+/// Range information is calculated as:
+/// - If it is a `Literal` node, we set the range as a point value. If it is a
+/// `Column` node, we set the datatype of the range, but cannot give an
interval
+/// for the range, yet.
+/// - If it is an intermediate node, the children states matter. Each
`PhysicalExpr`
+/// and operator has its own rules on how to propagate the children range.
+fn update_properties(
+ mut node: ExprPropertiesNode,
eq_properties: &EquivalenceProperties,
-) -> Transformed<ExprOrdering> {
- // We have a Column, which is one of the two possible leaf node types:
+) -> Result<Transformed<ExprPropertiesNode>> {
+ // First, try to gather the information from the children:
+ if !node.expr.children().is_empty() {
+ // We have an intermediate (non-leaf) node, account for its children:
+ let children_props = node.children.iter().map(|c|
c.data.clone()).collect_vec();
+ node.data = node.expr.get_properties(&children_props)?;
+ } else if node.expr.as_any().is::<Literal>() {
+ // We have a Literal, which is one of the two possible leaf node types:
+ node.data = node.expr.get_properties(&[])?;
+ } else if node.expr.as_any().is::<Column>() {
+ // We have a Column, which is the other possible leaf node type:
+ node.data.range =
+
Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
+ }
+ // Now, check what we know about orderings:
let normalized_expr =
eq_properties.eq_group.normalize_expr(node.expr.clone());
if eq_properties.is_expr_constant(&normalized_expr) {
- node.data = SortProperties::Singleton;
+ node.data.sort_properties = SortProperties::Singleton;
} else if let Some(options) = eq_properties
.normalized_oeq_class()
.get_options(&normalized_expr)
{
- node.data = SortProperties::Ordered(options);
- } else if !node.expr.children().is_empty() {
- // We have an intermediate (non-leaf) node, account for its children:
- let children_orderings = node.children.iter().map(|c|
c.data).collect_vec();
- node.data = node.expr.get_ordering(&children_orderings);
- } else if node.expr.as_any().is::<Literal>() {
- // We have a Literal, which is the other possible leaf node type:
- node.data = node.expr.get_ordering(&[]);
- } else {
- return Transformed::no(node);
+ node.data.sort_properties = SortProperties::Ordered(options);
}
- Transformed::yes(node)
+ Ok(Transformed::yes(node))
}
/// This function determines whether the provided expression is constant
@@ -1124,8 +1162,9 @@ fn generate_dependency_orderings(
.collect()
}
-/// This function examines the given expression and the sort expressions it
-/// refers to determine the ordering properties of the expression.
+/// This function examines the given expression and its properties to determine
+/// the ordering properties of the expression. The range knowledge is not
utilized
+/// yet in the scope of this function.
///
/// # Parameters
///
@@ -1133,26 +1172,41 @@ fn generate_dependency_orderings(
/// which ordering properties need to be determined.
/// - `dependencies`: A reference to `Dependencies`, containing sort
expressions
/// referred to by `expr`.
+/// - `schema``: A reference to the schema which the `expr` columns refer.
///
/// # Returns
///
/// A `SortProperties` indicating the ordering information of the given
expression.
-fn get_expr_ordering(
+fn get_expr_properties(
expr: &Arc<dyn PhysicalExpr>,
dependencies: &Dependencies,
-) -> SortProperties {
+ schema: &SchemaRef,
+) -> Result<ExprProperties> {
if let Some(column_order) = dependencies.iter().find(|&order|
expr.eq(&order.expr)) {
// If exact match is found, return its ordering.
- SortProperties::Ordered(column_order.options)
+ Ok(ExprProperties {
+ sort_properties: SortProperties::Ordered(column_order.options),
+ range: Interval::make_unbounded(&expr.data_type(schema)?)?,
+ })
+ } else if expr.as_any().downcast_ref::<Column>().is_some() {
+ Ok(ExprProperties {
+ sort_properties: SortProperties::Unordered,
+ range: Interval::make_unbounded(&expr.data_type(schema)?)?,
+ })
+ } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
+ Ok(ExprProperties {
+ sort_properties: SortProperties::Singleton,
+ range: Interval::try_new(literal.value().clone(),
literal.value().clone())?,
+ })
} else {
// Find orderings of its children
let child_states = expr
.children()
.iter()
- .map(|child| get_expr_ordering(child, dependencies))
- .collect::<Vec<_>>();
+ .map(|child| get_expr_properties(child, dependencies, schema))
+ .collect::<Result<Vec<_>>>()?;
// Calculate expression ordering using ordering of its children.
- expr.get_ordering(&child_states)
+ expr.get_properties(&child_states)
}
}
@@ -1351,12 +1405,7 @@ impl Hash for ExprWrapper {
mod tests {
use std::ops::Not;
- use arrow::datatypes::{DataType, Field, Schema};
- use arrow_schema::{Fields, TimeUnit};
-
- use datafusion_common::DFSchema;
- use datafusion_expr::{Operator, ScalarUDF};
-
+ use super::*;
use crate::equivalence::add_offset_to_expr;
use crate::equivalence::tests::{
convert_to_orderings, convert_to_sort_exprs, convert_to_sort_reqs,
@@ -1366,7 +1415,10 @@ mod tests {
use crate::expressions::{col, BinaryExpr, Column};
use crate::utils::tests::TestScalarUDF;
- use super::*;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow_schema::{Fields, TimeUnit};
+ use datafusion_common::DFSchema;
+ use datafusion_expr::{Operator, ScalarUDF};
#[test]
fn project_equivalence_properties_test() -> Result<()> {
@@ -1577,8 +1629,8 @@ mod tests {
let mut join_eq_properties =
EquivalenceProperties::new(Arc::new(schema));
// a=x and d=w
- join_eq_properties.add_equal_conditions(col_a, col_x);
- join_eq_properties.add_equal_conditions(col_d, col_w);
+ join_eq_properties.add_equal_conditions(col_a, col_x)?;
+ join_eq_properties.add_equal_conditions(col_d, col_w)?;
updated_right_ordering_equivalence_class(
&mut right_oeq_class,
@@ -1615,7 +1667,7 @@ mod tests {
let col_c_expr = col("c", &schema)?;
let mut eq_properties =
EquivalenceProperties::new(Arc::new(schema.clone()));
- eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr);
+ eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr)?;
let others = vec![
vec![PhysicalSortExpr {
expr: col_b_expr.clone(),
@@ -1760,7 +1812,7 @@ mod tests {
}
#[test]
- fn test_update_ordering() -> Result<()> {
+ fn test_update_properties() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
@@ -1778,7 +1830,7 @@ mod tests {
nulls_first: false,
};
// b=a (e.g they are aliases)
- eq_properties.add_equal_conditions(col_b, col_a);
+ eq_properties.add_equal_conditions(col_b, col_a)?;
// [b ASC], [d ASC]
eq_properties.add_new_orderings(vec![
vec![PhysicalSortExpr {
@@ -1821,12 +1873,12 @@ mod tests {
.iter()
.flat_map(|ordering| ordering.first().cloned())
.collect::<Vec<_>>();
- let expr_ordering = eq_properties.get_expr_ordering(expr.clone());
+ let expr_props = eq_properties.get_expr_properties(expr.clone());
let err_msg = format!(
"expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings:
{leading_orderings:?}",
- expr, expected, expr_ordering.data
+ expr, expected, expr_props.sort_properties
);
- assert_eq!(expr_ordering.data, expected, "{}", err_msg);
+ assert_eq!(expr_props.sort_properties, expected, "{}", err_msg);
}
Ok(())
@@ -2266,6 +2318,7 @@ mod tests {
Ok(())
}
+
#[test]
fn test_eliminate_redundant_monotonic_sorts() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
@@ -2334,7 +2387,7 @@ mod tests {
for case in cases {
let mut properties =
base_properties.clone().add_constants(case.constants);
for [left, right] in &case.equal_conditions {
- properties.add_equal_conditions(left, right)
+ properties.add_equal_conditions(left, right)?
}
let sort = case
diff --git a/datafusion/physical-expr/src/expressions/binary.rs
b/datafusion/physical-expr/src/expressions/binary.rs
index 76154dca03..08f7523f92 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -23,21 +23,21 @@ use std::{any::Any, sync::Arc};
use crate::expressions::datum::{apply, apply_cmp};
use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use arrow::array::*;
use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
use arrow::compute::kernels::cmp::*;
-use arrow::compute::kernels::comparison::regexp_is_match_utf8;
-use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar;
+use arrow::compute::kernels::comparison::{
+ regexp_is_match_utf8, regexp_is_match_utf8_scalar,
+};
use arrow::compute::kernels::concat_elements::concat_elements_utf8;
use arrow::compute::{cast, ilike, like, nilike, nlike};
use arrow::datatypes::*;
-
use datafusion_common::cast::as_boolean_array;
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_expr::interval_arithmetic::{apply_operator, Interval};
+use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::type_coercion::binary::get_result_type;
use datafusion_expr::{ColumnarValue, Operator};
@@ -442,17 +442,45 @@ impl PhysicalExpr for BinaryExpr {
self.hash(&mut s);
}
- /// For each operator, [`BinaryExpr`] has distinct ordering rules.
- /// TODO: There may be rules specific to some data types (such as division
and multiplication on unsigned integers)
- fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
- let (left_child, right_child) = (&children[0], &children[1]);
+ /// For each operator, [`BinaryExpr`] has distinct rules.
+ /// TODO: There may be rules specific to some data types and expression
ranges.
+ fn get_properties(&self, children: &[ExprProperties]) ->
Result<ExprProperties> {
+ let (l_order, l_range) = (children[0].sort_properties,
&children[0].range);
+ let (r_order, r_range) = (children[1].sort_properties,
&children[1].range);
match self.op() {
- Operator::Plus => left_child.add(right_child),
- Operator::Minus => left_child.sub(right_child),
- Operator::Gt | Operator::GtEq =>
left_child.gt_or_gteq(right_child),
- Operator::Lt | Operator::LtEq =>
right_child.gt_or_gteq(left_child),
- Operator::And | Operator::Or => left_child.and_or(right_child),
- _ => SortProperties::Unordered,
+ Operator::Plus => Ok(ExprProperties {
+ sort_properties: l_order.add(&r_order),
+ range: l_range.add(r_range)?,
+ }),
+ Operator::Minus => Ok(ExprProperties {
+ sort_properties: l_order.sub(&r_order),
+ range: l_range.sub(r_range)?,
+ }),
+ Operator::Gt => Ok(ExprProperties {
+ sort_properties: l_order.gt_or_gteq(&r_order),
+ range: l_range.gt(r_range)?,
+ }),
+ Operator::GtEq => Ok(ExprProperties {
+ sort_properties: l_order.gt_or_gteq(&r_order),
+ range: l_range.gt_eq(r_range)?,
+ }),
+ Operator::Lt => Ok(ExprProperties {
+ sort_properties: r_order.gt_or_gteq(&l_order),
+ range: l_range.lt(r_range)?,
+ }),
+ Operator::LtEq => Ok(ExprProperties {
+ sort_properties: r_order.gt_or_gteq(&l_order),
+ range: l_range.lt_eq(r_range)?,
+ }),
+ Operator::And => Ok(ExprProperties {
+ sort_properties: r_order.and_or(&l_order),
+ range: l_range.and(r_range)?,
+ }),
+ Operator::Or => Ok(ExprProperties {
+ sort_properties: r_order.and_or(&l_order),
+ range: l_range.or(r_range)?,
+ }),
+ _ => Ok(ExprProperties::new_unknown()),
}
}
}
@@ -623,6 +651,7 @@ pub fn binary(
mod tests {
use super::*;
use crate::expressions::{col, lit, try_cast, Literal};
+
use datafusion_common::plan_datafusion_err;
use datafusion_expr::type_coercion::binary::get_input_types;
diff --git a/datafusion/physical-expr/src/expressions/cast.rs
b/datafusion/physical-expr/src/expressions/cast.rs
index a3b32461e5..79a44ac30c 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -15,21 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
-use crate::PhysicalExpr;
use std::any::Any;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
-use DataType::*;
+
+use crate::physical_expr::down_cast_any_ref;
+use crate::PhysicalExpr;
use arrow::compute::{can_cast_types, CastOptions};
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, DataType::*, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::format::DEFAULT_FORMAT_OPTIONS;
use datafusion_common::{not_impl_err, Result};
use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::ColumnarValue;
const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
@@ -163,9 +163,21 @@ impl PhysicalExpr for CastExpr {
self.cast_options.hash(&mut s);
}
- /// A [`CastExpr`] preserves the ordering of its child.
- fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
- children[0]
+ /// A [`CastExpr`] preserves the ordering of its child if the cast is done
+ /// under the same datatype family.
+ fn get_properties(&self, children: &[ExprProperties]) ->
Result<ExprProperties> {
+ let source_datatype = children[0].range.data_type();
+ let target_type = &self.cast_type;
+
+ let unbounded = Interval::make_unbounded(target_type)?;
+ if source_datatype.is_numeric() && target_type.is_numeric()
+ || source_datatype.is_temporal() && target_type.is_temporal()
+ || source_datatype.eq(target_type)
+ {
+ Ok(children[0].clone().with_range(unbounded))
+ } else {
+ Ok(ExprProperties::new_unknown().with_range(unbounded))
+ }
}
}
diff --git a/datafusion/physical-expr/src/expressions/literal.rs
b/datafusion/physical-expr/src/expressions/literal.rs
index 35ea80ea57..371028959a 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -22,7 +22,6 @@ use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use arrow::{
@@ -30,6 +29,8 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, Expr};
/// Represents a literal value
@@ -90,8 +91,11 @@ impl PhysicalExpr for Literal {
self.hash(&mut s);
}
- fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
- SortProperties::Singleton
+ fn get_properties(&self, _children: &[ExprProperties]) ->
Result<ExprProperties> {
+ Ok(ExprProperties {
+ sort_properties: SortProperties::Singleton,
+ range: Interval::try_new(self.value().clone(),
self.value().clone())?,
+ })
}
}
@@ -115,6 +119,7 @@ pub fn lit<T: datafusion_expr::Literal>(value: T) ->
Arc<dyn PhysicalExpr> {
#[cfg(test)]
mod tests {
use super::*;
+
use arrow::array::Int32Array;
use arrow::datatypes::*;
use datafusion_common::cast::as_int32_array;
diff --git a/datafusion/physical-expr/src/expressions/negative.rs
b/datafusion/physical-expr/src/expressions/negative.rs
index f6d4620c42..62f865bd9b 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -22,7 +22,6 @@ use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use arrow::{
@@ -32,6 +31,7 @@ use arrow::{
};
use datafusion_common::{plan_err, Result};
use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::{
type_coercion::{is_interval, is_null, is_signed_numeric, is_timestamp},
ColumnarValue,
@@ -134,8 +134,11 @@ impl PhysicalExpr for NegativeExpr {
}
/// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
- fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
- -children[0]
+ fn get_properties(&self, children: &[ExprProperties]) ->
Result<ExprProperties> {
+ Ok(ExprProperties {
+ sort_properties: -children[0].sort_properties,
+ range: children[0].range.clone().arithmetic_negate()?,
+ })
}
}
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index 21cf6d348c..9c7d6d0934 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -33,14 +33,12 @@
use std::sync::Arc;
-use arrow::array::ArrayRef;
-use arrow_array::Array;
-
-pub use crate::scalar_function::create_physical_expr;
+use arrow::array::{Array, ArrayRef};
use datafusion_common::{Result, ScalarValue};
-pub use datafusion_expr::FuncMonotonicity;
use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation};
+pub use crate::scalar_function::create_physical_expr;
+
#[derive(Debug, Clone, Copy)]
pub enum Hint {
/// Indicates the argument needs to be padded if it is scalar
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index aef5aa7c00..1bdf082b2e 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -61,13 +61,6 @@ pub use scalar_function::ScalarFunctionExpr;
pub use datafusion_physical_expr_common::utils::reverse_order_bys;
pub use utils::split_conjunction;
-// For backwards compatibility
-pub mod sort_properties {
- pub use datafusion_physical_expr_common::sort_properties::{
- ExprOrdering, SortProperties,
- };
-}
-
// For backwards compatibility
pub mod tree_node {
pub use datafusion_physical_expr_common::tree_node::ExprContext;
diff --git a/datafusion/physical-expr/src/scalar_function.rs
b/datafusion/physical-expr/src/scalar_function.rs
index 1244a9b4db..daa1100710 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -32,19 +32,18 @@
use std::any::Any;
use std::fmt::{self, Debug, Formatter};
use std::hash::{Hash, Hasher};
-use std::ops::Neg;
use std::sync::Arc;
+use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal};
+use crate::PhysicalExpr;
+
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
-
use datafusion_common::{internal_err, DFSchema, Result};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf;
-use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, FuncMonotonicity,
ScalarUDF};
-
-use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal};
-use crate::sort_properties::SortProperties;
-use crate::PhysicalExpr;
+use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, ScalarUDF};
/// Physical expression of a scalar function
pub struct ScalarFunctionExpr {
@@ -52,11 +51,6 @@ pub struct ScalarFunctionExpr {
name: String,
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
- // Keeps monotonicity information of the function.
- // FuncMonotonicity vector is one to one mapped to `args`,
- // and it specifies the effect of an increase or decrease in
- // the corresponding `arg` to the function value.
- monotonicity: Option<FuncMonotonicity>,
}
impl Debug for ScalarFunctionExpr {
@@ -66,7 +60,6 @@ impl Debug for ScalarFunctionExpr {
.field("name", &self.name)
.field("args", &self.args)
.field("return_type", &self.return_type)
- .field("monotonicity", &self.monotonicity)
.finish()
}
}
@@ -78,14 +71,12 @@ impl ScalarFunctionExpr {
fun: Arc<ScalarUDF>,
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
- monotonicity: Option<FuncMonotonicity>,
) -> Self {
Self {
fun,
name: name.to_owned(),
args,
return_type,
- monotonicity,
}
}
@@ -108,11 +99,6 @@ impl ScalarFunctionExpr {
pub fn return_type(&self) -> &DataType {
&self.return_type
}
-
- /// Monotonicity information of the function
- pub fn monotonicity(&self) -> &Option<FuncMonotonicity> {
- &self.monotonicity
- }
}
impl fmt::Display for ScalarFunctionExpr {
@@ -170,10 +156,21 @@ impl PhysicalExpr for ScalarFunctionExpr {
self.fun.clone(),
children,
self.return_type().clone(),
- self.monotonicity.clone(),
)))
}
+ fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
+ self.fun.evaluate_bounds(children)
+ }
+
+ fn propagate_constraints(
+ &self,
+ interval: &Interval,
+ children: &[&Interval],
+ ) -> Result<Option<Vec<Interval>>> {
+ self.fun.propagate_constraints(interval, children)
+ }
+
fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.name.hash(&mut s);
@@ -182,11 +179,18 @@ impl PhysicalExpr for ScalarFunctionExpr {
// Add `self.fun` when hash is available
}
- fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
- self.monotonicity
- .as_ref()
- .map(|monotonicity| out_ordering(monotonicity, children))
- .unwrap_or(SortProperties::Unordered)
+ fn get_properties(&self, children: &[ExprProperties]) ->
Result<ExprProperties> {
+ let sort_properties = self.fun.monotonicity(children)?;
+ let children_range = children
+ .iter()
+ .map(|props| &props.range)
+ .collect::<Vec<_>>();
+ let range = self.fun().evaluate_bounds(&children_range)?;
+
+ Ok(ExprProperties {
+ sort_properties,
+ range,
+ })
}
}
@@ -231,63 +235,5 @@ pub fn create_physical_expr(
Arc::new(fun.clone()),
input_phy_exprs.to_vec(),
return_type,
- fun.monotonicity()?,
)))
}
-
-/// Determines a [ScalarFunctionExpr]'s monotonicity for the given arguments
-/// and the function's behavior depending on its arguments.
-///
-/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr
-pub fn out_ordering(
- func: &FuncMonotonicity,
- arg_orderings: &[SortProperties],
-) -> SortProperties {
- func.iter().zip(arg_orderings).fold(
- SortProperties::Singleton,
- |prev_sort, (item, arg)| {
- let current_sort = func_order_in_one_dimension(item, arg);
-
- match (prev_sort, current_sort) {
- (_, SortProperties::Unordered) => SortProperties::Unordered,
- (SortProperties::Singleton, SortProperties::Ordered(_)) =>
current_sort,
- (SortProperties::Ordered(prev),
SortProperties::Ordered(current))
- if prev.descending != current.descending =>
- {
- SortProperties::Unordered
- }
- _ => prev_sort,
- }
- },
- )
-}
-
-/// This function decides the monotonicity property of a [ScalarFunctionExpr]
for a single argument (i.e. across a single dimension), given that argument's
sort properties.
-///
-/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr
-fn func_order_in_one_dimension(
- func_monotonicity: &Option<bool>,
- arg: &SortProperties,
-) -> SortProperties {
- if *arg == SortProperties::Singleton {
- SortProperties::Singleton
- } else {
- match func_monotonicity {
- None => SortProperties::Unordered,
- Some(false) => {
- if let SortProperties::Ordered(_) = arg {
- arg.neg()
- } else {
- SortProperties::Unordered
- }
- }
- Some(true) => {
- if let SortProperties::Ordered(_) = arg {
- *arg
- } else {
- SortProperties::Unordered
- }
- }
- }
- }
-}
diff --git a/datafusion/physical-expr/src/utils/mod.rs
b/datafusion/physical-expr/src/utils/mod.rs
index 76cee3a1a7..6b964546cb 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -255,19 +255,18 @@ pub fn merge_vectors(
#[cfg(test)]
pub(crate) mod tests {
- use arrow_array::{ArrayRef, Float32Array, Float64Array};
use std::any::Any;
use std::fmt::{Display, Formatter};
use super::*;
use crate::expressions::{binary, cast, col, in_list, lit, Literal};
+ use arrow_array::{ArrayRef, Float32Array, Float64Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::{exec_err, DataFusionError, ScalarValue};
+ use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+ use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
- use datafusion_expr::{
- ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility,
- };
use petgraph::visit::Bfs;
#[derive(Debug, Clone)]
@@ -309,8 +308,8 @@ pub(crate) mod tests {
}
}
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(Some(vec![Some(true)]))
+ fn monotonicity(&self, input: &[ExprProperties]) ->
Result<SortProperties> {
+ Ok(input[0].sort_properties)
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 95376e7e69..21608db40d 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -2080,7 +2080,7 @@ mod tests {
let col_c = &col("c", &test_schema)?;
let mut eq_properties = EquivalenceProperties::new(test_schema);
// Columns a and b are equal.
- eq_properties.add_equal_conditions(col_a, col_b);
+ eq_properties.add_equal_conditions(col_a, col_b)?;
// Aggregate requirements are
// [None], [a ASC], [a ASC, b ASC, c ASC], [a ASC, b ASC] respectively
let order_by_exprs = vec![
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index bf1ab8b731..6729e3b9e6 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -192,7 +192,7 @@ impl FilterExec {
let mut eq_properties = input.equivalence_properties().clone();
let (equal_pairs, _) = collect_columns_from_predicate(predicate);
for (lhs, rhs) in equal_pairs {
- eq_properties.add_equal_conditions(lhs, rhs)
+ eq_properties.add_equal_conditions(lhs, rhs)?
}
// Add the columns that have only one viable value (singleton) after
// filtering to constants.
@@ -433,13 +433,12 @@ pub type EqualAndNonEqual<'a> =
#[cfg(test)]
mod tests {
-
use super::*;
+ use crate::empty::EmptyExec;
use crate::expressions::*;
use crate::test;
use crate::test::exec::StatisticsExec;
- use crate::empty::EmptyExec;
use arrow::datatypes::{Field, Schema};
use arrow_schema::{UnionFields, UnionMode};
use datafusion_common::ScalarValue;
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index a290f30586..b7bc60a048 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -354,7 +354,6 @@ pub fn parse_physical_expr(
scalar_fun_def,
args,
convert_required!(e.return_type)?,
- None,
))
}
ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index dd8e450d31..79abecf556 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -624,7 +624,6 @@ fn roundtrip_scalar_udf() -> Result<()> {
fun_def,
vec![col("a", &schema)?],
DataType::Int64,
- None,
);
let project =
@@ -752,7 +751,6 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> {
Arc::new(udf.clone()),
vec![col("text", &schema)?],
DataType::Int64,
- None,
));
let filter = Arc::new(FilterExec::try_new(
diff --git a/datafusion/sqllogictest/test_files/order.slt
b/datafusion/sqllogictest/test_files/order.slt
index 0f869fc0b4..fb07d5ebe8 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -955,3 +955,154 @@ drop table foo;
statement ok
drop table ambiguity_test;
+
+# Casting from numeric to string types breaks the ordering
+statement ok
+CREATE EXTERNAL TABLE ordered_table (
+ a0 INT,
+ a INT,
+ b INT,
+ c INT,
+ d INT
+)
+STORED AS CSV
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv'
+OPTIONS ('format.has_header' 'true');
+
+query T
+SELECT CAST(c as VARCHAR) as c_str
+FROM ordered_table
+ORDER BY c_str
+limit 5;
+----
+0
+1
+10
+11
+12
+
+query TT
+EXPLAIN SELECT CAST(c as VARCHAR) as c_str
+FROM ordered_table
+ORDER BY c_str
+limit 5;
+----
+logical_plan
+01)Limit: skip=0, fetch=5
+02)--Sort: c_str ASC NULLS LAST, fetch=5
+03)----Projection: CAST(ordered_table.c AS Utf8) AS c_str
+04)------TableScan: ordered_table projection=[c]
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=5
+02)--SortPreservingMergeExec: [c_str@0 ASC NULLS LAST], fetch=5
+03)----SortExec: TopK(fetch=5), expr=[c_str@0 ASC NULLS LAST],
preserve_partitioning=[true]
+04)------ProjectionExec: expr=[CAST(c@0 AS Utf8) as c_str]
+05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+06)----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+
+# Casting from numeric to numeric types preserves the ordering
+query I
+SELECT CAST(c as BIGINT) as c_bigint
+FROM ordered_table
+ORDER BY c_bigint
+limit 5;
+----
+0
+1
+2
+3
+4
+
+query TT
+EXPLAIN SELECT CAST(c as BIGINT) as c_bigint
+FROM ordered_table
+ORDER BY c_bigint
+limit 5;
+----
+logical_plan
+01)Limit: skip=0, fetch=5
+02)--Sort: c_bigint ASC NULLS LAST, fetch=5
+03)----Projection: CAST(ordered_table.c AS Int64) AS c_bigint
+04)------TableScan: ordered_table projection=[c]
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=5
+02)--SortPreservingMergeExec: [c_bigint@0 ASC NULLS LAST], fetch=5
+03)----ProjectionExec: expr=[CAST(c@0 AS Int64) as c_bigint]
+04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+statement ok
+drop table ordered_table;
+
+
+# ABS(x) breaks the ordering if x's range contains both negative and positive
values.
+# Since x is defined as INT, its range is assumed to be from NEG_INF to INF.
+statement ok
+CREATE EXTERNAL TABLE ordered_table (
+ a0 INT,
+ a INT,
+ b INT,
+ c INT,
+ d INT
+)
+STORED AS CSV
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv'
+OPTIONS ('format.has_header' 'true');
+
+query TT
+EXPLAIN SELECT ABS(c) as abs_c
+FROM ordered_table
+ORDER BY abs_c
+limit 5;
+----
+logical_plan
+01)Limit: skip=0, fetch=5
+02)--Sort: abs_c ASC NULLS LAST, fetch=5
+03)----Projection: abs(ordered_table.c) AS abs_c
+04)------TableScan: ordered_table projection=[c]
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=5
+02)--SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5
+03)----SortExec: TopK(fetch=5), expr=[abs_c@0 ASC NULLS LAST],
preserve_partitioning=[true]
+04)------ProjectionExec: expr=[abs(c@0) as abs_c]
+05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+06)----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+statement ok
+drop table ordered_table;
+
+# ABS(x) preserves the ordering if x's range falls into positive values.
+# Since x is defined as INT UNSIGNED, its range is assumed to be from 0 to INF.
+statement ok
+CREATE EXTERNAL TABLE ordered_table (
+ a0 INT,
+ a INT,
+ b INT,
+ c INT UNSIGNED,
+ d INT
+)
+STORED AS CSV
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv'
+OPTIONS ('format.has_header' 'true');
+
+query TT
+EXPLAIN SELECT ABS(c) as abs_c
+FROM ordered_table
+ORDER BY abs_c
+limit 5;
+----
+logical_plan
+01)Limit: skip=0, fetch=5
+02)--Sort: abs_c ASC NULLS LAST, fetch=5
+03)----Projection: abs(ordered_table.c) AS abs_c
+04)------TableScan: ordered_table projection=[c]
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=5
+02)--SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5
+03)----ProjectionExec: expr=[abs(c@0) as abs_c]
+04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
output_ordering=[c@0 ASC NULLS LAST], has_header=true
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]