This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d2fb05ed5b PhysicalExpr Orderings with Range Information (#10504)
d2fb05ed5b is described below

commit d2fb05ed5ba71fd0f1d440baca12897413c2a8af
Author: Berkay Şahin <[email protected]>
AuthorDate: Fri May 17 14:08:01 2024 +0300

    PhysicalExpr Orderings with Range Information (#10504)
    
    * Self review
    
    * Fix null interval accumulation
    
    * Refactor monotonicity
    
    * Ignore failing tests
    
    * Initial impl
    
    * Ready for review
    
    * Update properties.rs
    
    * Update configs.md
    
    Update configs.md
    
    * cargo doc
    
    * Add abs test
    
    * Update properties.rs
    
    * Update udf.rs
    
    * Review Part 1
    
    * Review Part 2
    
    * Minor
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion-examples/examples/advanced_udf.rs       |  28 +--
 datafusion-examples/examples/function_factory.rs   |  13 +-
 .../src/physical_optimizer/enforce_distribution.rs |   8 +-
 .../core/src/physical_optimizer/join_selection.rs  |   4 +-
 .../src/physical_optimizer/projection_pushdown.rs  |   4 -
 .../fuzz_cases/sort_preserving_repartition_fuzz.rs |   6 +-
 .../user_defined/user_defined_scalar_functions.rs  |  15 +-
 datafusion/expr/src/interval_arithmetic.rs         | 104 +++++++--
 datafusion/expr/src/lib.rs                         |   4 +-
 datafusion/expr/src/signature.rs                   |   8 -
 .../src/sort_properties.rs                         |  59 +++--
 datafusion/expr/src/udf.rs                         | 132 +++++++++--
 datafusion/functions/src/datetime/date_bin.rs      |  28 ++-
 datafusion/functions/src/datetime/date_trunc.rs    |  33 ++-
 datafusion/functions/src/macros.rs                 |  36 +--
 datafusion/functions/src/math/abs.rs               |  43 ++--
 datafusion/functions/src/math/log.rs               |  46 ++--
 datafusion/functions/src/math/mod.rs               |  72 +++---
 datafusion/functions/src/math/monotonicity.rs      | 241 +++++++++++++++++++++
 datafusion/functions/src/math/pi.rs                |  10 +-
 datafusion/functions/src/math/round.rs             |  27 ++-
 datafusion/functions/src/math/trunc.rs             |  27 ++-
 datafusion/physical-expr-common/src/lib.rs         |   1 -
 .../physical-expr-common/src/physical_expr.rs      |  24 +-
 datafusion/physical-expr-common/src/utils.rs       |  31 ++-
 datafusion/physical-expr/src/equivalence/mod.rs    |  15 +-
 .../physical-expr/src/equivalence/ordering.rs      |  26 +--
 .../physical-expr/src/equivalence/projection.rs    |  25 +--
 .../physical-expr/src/equivalence/properties.rs    | 223 +++++++++++--------
 datafusion/physical-expr/src/expressions/binary.rs |  57 +++--
 datafusion/physical-expr/src/expressions/cast.rs   |  28 ++-
 .../physical-expr/src/expressions/literal.rs       |  11 +-
 .../physical-expr/src/expressions/negative.rs      |   9 +-
 datafusion/physical-expr/src/functions.rs          |   8 +-
 datafusion/physical-expr/src/lib.rs                |   7 -
 datafusion/physical-expr/src/scalar_function.rs    | 114 +++-------
 datafusion/physical-expr/src/utils/mod.rs          |  11 +-
 datafusion/physical-plan/src/aggregates/mod.rs     |   2 +-
 datafusion/physical-plan/src/filter.rs             |   5 +-
 datafusion/proto/src/physical_plan/from_proto.rs   |   1 -
 .../proto/tests/cases/roundtrip_physical_plan.rs   |   2 -
 datafusion/sqllogictest/test_files/order.slt       | 151 +++++++++++++
 42 files changed, 1196 insertions(+), 503 deletions(-)

diff --git a/datafusion-examples/examples/advanced_udf.rs 
b/datafusion-examples/examples/advanced_udf.rs
index c8063c0eb1..d1ef1c6c9d 100644
--- a/datafusion-examples/examples/advanced_udf.rs
+++ b/datafusion-examples/examples/advanced_udf.rs
@@ -15,26 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::{
-    arrow::{
-        array::{ArrayRef, Float32Array, Float64Array},
-        datatypes::DataType,
-        record_batch::RecordBatch,
-    },
-    logical_expr::Volatility,
-};
 use std::any::Any;
+use std::sync::Arc;
 
-use arrow::array::{new_null_array, Array, AsArray};
+use arrow::array::{
+    new_null_array, Array, ArrayRef, AsArray, Float32Array, Float64Array,
+};
 use arrow::compute;
-use arrow::datatypes::Float64Type;
+use arrow::datatypes::{DataType, Float64Type};
+use arrow::record_batch::RecordBatch;
 use datafusion::error::Result;
+use datafusion::logical_expr::Volatility;
 use datafusion::prelude::*;
 use datafusion_common::{internal_err, ScalarValue};
-use datafusion_expr::{
-    ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature,
-};
-use std::sync::Arc;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};
 
 /// This example shows how to use the full ScalarUDFImpl API to implement a 
user
 /// defined function. As in the `simple_udf.rs` example, this struct implements
@@ -186,8 +181,9 @@ impl ScalarUDFImpl for PowUdf {
         &self.aliases
     }
 
-    fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-        Ok(Some(vec![Some(true)]))
+    fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties> 
{
+        // The POW function preserves the order of its argument.
+        Ok(input[0].sort_properties)
     }
 }
 
diff --git a/datafusion-examples/examples/function_factory.rs 
b/datafusion-examples/examples/function_factory.rs
index 3973e50474..9e624b6629 100644
--- a/datafusion-examples/examples/function_factory.rs
+++ b/datafusion-examples/examples/function_factory.rs
@@ -15,17 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::result::Result as RResult;
+use std::sync::Arc;
+
 use datafusion::error::Result;
 use datafusion::execution::context::{
     FunctionFactory, RegisterFunction, SessionContext, SessionState,
 };
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::{exec_err, internal_err, DataFusionError};
-use datafusion_expr::simplify::ExprSimplifyResult;
-use datafusion_expr::simplify::SimplifyInfo;
+use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
 use datafusion_expr::{CreateFunction, Expr, ScalarUDF, ScalarUDFImpl, 
Signature};
-use std::result::Result as RResult;
-use std::sync::Arc;
 
 /// This example shows how to utilize [FunctionFactory] to implement simple
 /// SQL-macro like functions using a `CREATE FUNCTION` statement. The same
@@ -156,8 +157,8 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
         &[]
     }
 
-    fn monotonicity(&self) -> 
Result<Option<datafusion_expr::FuncMonotonicity>> {
-        Ok(None)
+    fn monotonicity(&self, _input: &[ExprProperties]) -> 
Result<SortProperties> {
+        Ok(SortProperties::Unordered)
     }
 }
 
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index c07f2c5dcf..cd84e911d3 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -3572,7 +3572,11 @@ pub(crate) mod tests {
             expr: col("c", &schema).unwrap(),
             options: SortOptions::default(),
         }];
-        let alias = vec![("a".to_string(), "a".to_string())];
+        let alias = vec![
+            ("a".to_string(), "a".to_string()),
+            ("b".to_string(), "b".to_string()),
+            ("c".to_string(), "c".to_string()),
+        ];
         let plan = sort_preserving_merge_exec(
             sort_key.clone(),
             sort_exec(
@@ -3585,7 +3589,7 @@ pub(crate) mod tests {
         let expected = &[
             "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
             // Since this projection is trivial, increasing parallelism is not 
beneficial
-            "ProjectionExec: expr=[a@0 as a]",
+            "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
             "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e]",
         ];
         assert_optimized!(expected, plan.clone(), true);
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 042a0198bf..135a59aa03 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -39,8 +39,8 @@ use crate::physical_plan::{ExecutionPlan, 
ExecutionPlanProperties};
 use arrow_schema::Schema;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{internal_err, JoinSide, JoinType};
+use datafusion_expr::sort_properties::SortProperties;
 use datafusion_physical_expr::expressions::Column;
-use datafusion_physical_expr::sort_properties::SortProperties;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 
 /// The [`JoinSelection`] rule tries to modify a given plan so that it can
@@ -561,7 +561,7 @@ fn hash_join_convert_symmetric_subrule(
                                 let name = schema.field(*index).name();
                                 let col = Arc::new(Column::new(name, *index)) 
as _;
                                 // Check if the column is ordered.
-                                equivalence.get_expr_ordering(col).data
+                                
equivalence.get_expr_properties(col).sort_properties
                                     != SortProperties::Unordered
                             },
                         )
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 0190f35cc9..fe1290e407 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1376,7 +1376,6 @@ mod tests {
                     )),
                 ],
                 DataType::Int32,
-                None,
             )),
             Arc::new(CaseExpr::try_new(
                 Some(Arc::new(Column::new("d", 2))),
@@ -1442,7 +1441,6 @@ mod tests {
                     )),
                 ],
                 DataType::Int32,
-                None,
             )),
             Arc::new(CaseExpr::try_new(
                 Some(Arc::new(Column::new("d", 3))),
@@ -1511,7 +1509,6 @@ mod tests {
                     )),
                 ],
                 DataType::Int32,
-                None,
             )),
             Arc::new(CaseExpr::try_new(
                 Some(Arc::new(Column::new("d", 2))),
@@ -1577,7 +1574,6 @@ mod tests {
                     )),
                 ],
                 DataType::Int32,
-                None,
             )),
             Arc::new(CaseExpr::try_new(
                 Some(Arc::new(Column::new("d_new", 3))),
diff --git 
a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
index 6c9c3359eb..21ef8a7c21 100644
--- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
@@ -39,12 +39,12 @@ mod sp_repartition_fuzz_tests {
         config::SessionConfig, memory_pool::MemoryConsumer, 
SendableRecordBatchStream,
     };
     use datafusion_physical_expr::{
+        equivalence::{EquivalenceClass, EquivalenceProperties},
         expressions::{col, Column},
-        EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+        PhysicalExpr, PhysicalSortExpr,
     };
     use test_utils::add_empty_batches;
 
-    use datafusion_physical_expr::equivalence::EquivalenceClass;
     use itertools::izip;
     use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
 
@@ -78,7 +78,7 @@ mod sp_repartition_fuzz_tests {
 
         let mut eq_properties = 
EquivalenceProperties::new(test_schema.clone());
         // Define a and f are aliases
-        eq_properties.add_equal_conditions(col_a, col_f);
+        eq_properties.add_equal_conditions(col_a, col_f)?;
         // Column e has constant value.
         eq_properties = eq_properties.add_constants([col_e.clone()]);
 
diff --git 
a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs 
b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
index def9fcb4c6..df41cab7bf 100644
--- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
@@ -15,26 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::any::Any;
+use std::sync::Arc;
+
 use arrow::compute::kernels::numeric::add;
 use arrow_array::{ArrayRef, Float32Array, Float64Array, Int32Array, 
RecordBatch};
 use arrow_schema::{DataType, Field, Schema};
 use datafusion::execution::context::{FunctionFactory, RegisterFunction, 
SessionState};
 use datafusion::prelude::*;
 use datafusion::{execution::registry::FunctionRegistry, test_util};
+use datafusion_common::cast::{as_float64_array, as_int32_array};
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::{
-    assert_batches_eq, assert_batches_sorted_eq, cast::as_float64_array,
-    cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, 
ScalarValue,
+    assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, 
internal_err,
+    not_impl_err, plan_err, DataFusionError, ExprSchema, Result, ScalarValue,
 };
-use datafusion_common::{assert_contains, exec_err, internal_err, 
DataFusionError};
 use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
 use datafusion_expr::{
     Accumulator, ColumnarValue, CreateFunction, ExprSchemable, 
LogicalPlanBuilder,
     ScalarUDF, ScalarUDFImpl, Signature, Volatility,
 };
-use std::any::Any;
-use std::sync::Arc;
 
 /// test that casting happens on udfs.
 /// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the 
logical plan and
@@ -776,10 +777,6 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
     fn aliases(&self) -> &[String] {
         &[]
     }
-
-    fn monotonicity(&self) -> 
Result<Option<datafusion_expr::FuncMonotonicity>> {
-        Ok(None)
-    }
 }
 
 impl ScalarFunctionWrapper {
diff --git a/datafusion/expr/src/interval_arithmetic.rs 
b/datafusion/expr/src/interval_arithmetic.rs
index ca91a8c9da..c4890b97e7 100644
--- a/datafusion/expr/src/interval_arithmetic.rs
+++ b/datafusion/expr/src/interval_arithmetic.rs
@@ -273,19 +273,34 @@ impl Interval {
                 unreachable!();
             };
             // Standardize boolean interval endpoints:
-            Self {
+            return Self {
                 lower: ScalarValue::Boolean(Some(lower_bool.unwrap_or(false))),
                 upper: ScalarValue::Boolean(Some(upper_bool.unwrap_or(true))),
-            }
+            };
         }
-        // Standardize floating-point endpoints:
-        else if lower.data_type() == DataType::Float32 {
-            handle_float_intervals!(Float32, f32, lower, upper)
-        } else if lower.data_type() == DataType::Float64 {
-            handle_float_intervals!(Float64, f64, lower, upper)
-        } else {
+        match lower.data_type() {
+            // Standardize floating-point endpoints:
+            DataType::Float32 => handle_float_intervals!(Float32, f32, lower, 
upper),
+            DataType::Float64 => handle_float_intervals!(Float64, f64, lower, 
upper),
+            // Unsigned null values for lower bounds are set to zero:
+            DataType::UInt8 if lower.is_null() => Self {
+                lower: ScalarValue::UInt8(Some(0)),
+                upper,
+            },
+            DataType::UInt16 if lower.is_null() => Self {
+                lower: ScalarValue::UInt16(Some(0)),
+                upper,
+            },
+            DataType::UInt32 if lower.is_null() => Self {
+                lower: ScalarValue::UInt32(Some(0)),
+                upper,
+            },
+            DataType::UInt64 if lower.is_null() => Self {
+                lower: ScalarValue::UInt64(Some(0)),
+                upper,
+            },
             // Other data types do not require standardization:
-            Self { lower, upper }
+            _ => Self { lower, upper },
         }
     }
 
@@ -299,6 +314,12 @@ impl Interval {
         Self::try_new(ScalarValue::from(lower), ScalarValue::from(upper))
     }
 
+    /// Creates a singleton zero interval if the datatype supported.
+    pub fn make_zero(data_type: &DataType) -> Result<Self> {
+        let zero_endpoint = ScalarValue::new_zero(data_type)?;
+        Ok(Self::new(zero_endpoint.clone(), zero_endpoint))
+    }
+
     /// Creates an unbounded interval from both sides if the datatype 
supported.
     pub fn make_unbounded(data_type: &DataType) -> Result<Self> {
         let unbounded_endpoint = ScalarValue::try_from(data_type)?;
@@ -369,7 +390,7 @@ impl Interval {
     /// NOTE: This function only works with intervals of the same data type.
     ///       Attempting to compare intervals of different data types will lead
     ///       to an error.
-    pub(crate) fn gt<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+    pub fn gt<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
         let rhs = other.borrow();
         if self.data_type().ne(&rhs.data_type()) {
             internal_err!(
@@ -402,7 +423,7 @@ impl Interval {
     /// NOTE: This function only works with intervals of the same data type.
     ///       Attempting to compare intervals of different data types will lead
     ///       to an error.
-    pub(crate) fn gt_eq<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+    pub fn gt_eq<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
         let rhs = other.borrow();
         if self.data_type().ne(&rhs.data_type()) {
             internal_err!(
@@ -435,7 +456,7 @@ impl Interval {
     /// NOTE: This function only works with intervals of the same data type.
     ///       Attempting to compare intervals of different data types will lead
     ///       to an error.
-    pub(crate) fn lt<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+    pub fn lt<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
         other.borrow().gt(self)
     }
 
@@ -446,7 +467,7 @@ impl Interval {
     /// NOTE: This function only works with intervals of the same data type.
     ///       Attempting to compare intervals of different data types will lead
     ///       to an error.
-    pub(crate) fn lt_eq<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+    pub fn lt_eq<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
         other.borrow().gt_eq(self)
     }
 
@@ -457,7 +478,7 @@ impl Interval {
     /// NOTE: This function only works with intervals of the same data type.
     ///       Attempting to compare intervals of different data types will lead
     ///       to an error.
-    pub(crate) fn equal<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+    pub fn equal<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
         let rhs = other.borrow();
         if get_result_type(&self.data_type(), &Operator::Eq, 
&rhs.data_type()).is_err() {
             internal_err!(
@@ -480,7 +501,7 @@ impl Interval {
 
     /// Compute the logical conjunction of this (boolean) interval with the
     /// given boolean interval.
-    pub(crate) fn and<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+    pub fn and<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
         let rhs = other.borrow();
         match (&self.lower, &self.upper, &rhs.lower, &rhs.upper) {
             (
@@ -501,8 +522,31 @@ impl Interval {
         }
     }
 
+    /// Compute the logical disjunction of this boolean interval with the
+    /// given boolean interval.
+    pub fn or<T: Borrow<Self>>(&self, other: T) -> Result<Self> {
+        let rhs = other.borrow();
+        match (&self.lower, &self.upper, &rhs.lower, &rhs.upper) {
+            (
+                &ScalarValue::Boolean(Some(self_lower)),
+                &ScalarValue::Boolean(Some(self_upper)),
+                &ScalarValue::Boolean(Some(other_lower)),
+                &ScalarValue::Boolean(Some(other_upper)),
+            ) => {
+                let lower = self_lower || other_lower;
+                let upper = self_upper || other_upper;
+
+                Ok(Self {
+                    lower: ScalarValue::Boolean(Some(lower)),
+                    upper: ScalarValue::Boolean(Some(upper)),
+                })
+            }
+            _ => internal_err!("Incompatible data types for logical 
conjunction"),
+        }
+    }
+
     /// Compute the logical negation of this (boolean) interval.
-    pub(crate) fn not(&self) -> Result<Self> {
+    pub fn not(&self) -> Result<Self> {
         if self.data_type().ne(&DataType::Boolean) {
             internal_err!("Cannot apply logical negation to a non-boolean 
interval")
         } else if self == &Self::CERTAINLY_TRUE {
@@ -761,6 +805,18 @@ impl Interval {
         }
         .map(|result| result + 1)
     }
+
+    /// Reflects an [`Interval`] around the point zero.
+    ///
+    /// This method computes the arithmetic negation of the interval, 
reflecting
+    /// it about the origin of the number line. This operation swaps and 
negates
+    /// the lower and upper bounds of the interval.
+    pub fn arithmetic_negate(self) -> Result<Self> {
+        Ok(Self {
+            lower: self.upper().clone().arithmetic_negate()?,
+            upper: self.lower().clone().arithmetic_negate()?,
+        })
+    }
 }
 
 impl Display for Interval {
@@ -1885,10 +1941,10 @@ mod tests {
 
         let unbounded_cases = vec![
             (DataType::Boolean, Boolean(Some(false)), Boolean(Some(true))),
-            (DataType::UInt8, UInt8(None), UInt8(None)),
-            (DataType::UInt16, UInt16(None), UInt16(None)),
-            (DataType::UInt32, UInt32(None), UInt32(None)),
-            (DataType::UInt64, UInt64(None), UInt64(None)),
+            (DataType::UInt8, UInt8(Some(0)), UInt8(None)),
+            (DataType::UInt16, UInt16(Some(0)), UInt16(None)),
+            (DataType::UInt32, UInt32(Some(0)), UInt32(None)),
+            (DataType::UInt64, UInt64(Some(0)), UInt64(None)),
             (DataType::Int8, Int8(None), Int8(None)),
             (DataType::Int16, Int16(None), Int16(None)),
             (DataType::Int32, Int32(None), Int32(None)),
@@ -1994,6 +2050,10 @@ mod tests {
                 Interval::make(None, Some(1000_i64))?,
                 Interval::make(Some(1000_i64), Some(1500_i64))?,
             ),
+            (
+                Interval::make(Some(0_u8), Some(0_u8))?,
+                Interval::make::<u8>(None, None)?,
+            ),
             (
                 Interval::try_new(
                     prev_value(ScalarValue::Float32(Some(0.0_f32))),
@@ -2036,6 +2096,10 @@ mod tests {
                 Interval::make(Some(-1000_i64), Some(1000_i64))?,
                 Interval::make(None, Some(-1500_i64))?,
             ),
+            (
+                Interval::make::<u64>(None, None)?,
+                Interval::make(Some(0_u64), Some(0_u64))?,
+            ),
             (
                 Interval::make(Some(0.0_f32), Some(0.0_f32))?,
                 Interval::make(Some(0.0_f32), Some(0.0_f32))?,
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index e2b68388ab..bac2f9c145 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -50,6 +50,7 @@ pub mod groups_accumulator;
 pub mod interval_arithmetic;
 pub mod logical_plan;
 pub mod simplify;
+pub mod sort_properties;
 pub mod tree_node;
 pub mod type_coercion;
 pub mod utils;
@@ -77,8 +78,7 @@ pub use logical_plan::*;
 pub use operator::Operator;
 pub use partition_evaluator::PartitionEvaluator;
 pub use signature::{
-    ArrayFunctionSignature, FuncMonotonicity, Signature, TypeSignature, 
Volatility,
-    TIMEZONE_WILDCARD,
+    ArrayFunctionSignature, Signature, TypeSignature, Volatility, 
TIMEZONE_WILDCARD,
 };
 pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
 pub use udaf::{AggregateUDF, AggregateUDFImpl};
diff --git a/datafusion/expr/src/signature.rs b/datafusion/expr/src/signature.rs
index 5d925c8605..63b030f0b7 100644
--- a/datafusion/expr/src/signature.rs
+++ b/datafusion/expr/src/signature.rs
@@ -343,14 +343,6 @@ impl Signature {
     }
 }
 
-/// Monotonicity of the `ScalarFunctionExpr` with respect to its arguments.
-/// Each element of this vector corresponds to an argument and indicates 
whether
-/// the function's behavior is monotonic, or non-monotonic/unknown for that 
argument, namely:
-/// - `None` signifies unknown monotonicity or non-monotonicity.
-/// - `Some(true)` indicates that the function is monotonically increasing 
w.r.t. the argument in question.
-/// - Some(false) indicates that the function is monotonically decreasing 
w.r.t. the argument in question.
-pub type FuncMonotonicity = Vec<Option<bool>>;
-
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/physical-expr-common/src/sort_properties.rs 
b/datafusion/expr/src/sort_properties.rs
similarity index 77%
rename from datafusion/physical-expr-common/src/sort_properties.rs
rename to datafusion/expr/src/sort_properties.rs
index 47a5d5ba5e..7778be2ecf 100644
--- a/datafusion/physical-expr-common/src/sort_properties.rs
+++ b/datafusion/expr/src/sort_properties.rs
@@ -17,9 +17,10 @@
 
 use std::ops::Neg;
 
-use arrow::compute::SortOptions;
+use crate::interval_arithmetic::Interval;
 
-use crate::tree_node::ExprContext;
+use arrow::compute::SortOptions;
+use arrow::datatypes::DataType;
 
 /// To propagate [`SortOptions`] across the `PhysicalExpr`, it is insufficient
 /// to simply use `Option<SortOptions>`: There must be a differentiation 
between
@@ -120,29 +121,39 @@ impl SortProperties {
 impl Neg for SortProperties {
     type Output = Self;
 
-    fn neg(self) -> Self::Output {
-        match self {
-            SortProperties::Ordered(SortOptions {
-                descending,
-                nulls_first,
-            }) => SortProperties::Ordered(SortOptions {
-                descending: !descending,
-                nulls_first,
-            }),
-            SortProperties::Singleton => SortProperties::Singleton,
-            SortProperties::Unordered => SortProperties::Unordered,
+    fn neg(mut self) -> Self::Output {
+        if let SortProperties::Ordered(SortOptions { descending, .. }) = &mut 
self {
+            *descending = !*descending;
         }
+        self
     }
 }
 
-/// The `ExprOrdering` struct is designed to aid in the determination of 
ordering (represented
-/// by [`SortProperties`]) for a given `PhysicalExpr`. When analyzing the 
orderings
-/// of a `PhysicalExpr`, the process begins by assigning the ordering of its 
leaf nodes.
-/// By propagating these leaf node orderings upwards in the expression tree, 
the overall
-/// ordering of the entire `PhysicalExpr` can be derived.
-///
-/// This struct holds the necessary state information for each expression in 
the `PhysicalExpr`.
-/// It encapsulates the orderings (`data`) associated with the expression 
(`expr`), and
-/// orderings of the children expressions (`children`). The [`ExprOrdering`] 
of a parent
-/// expression is determined based on the [`ExprOrdering`] states of its 
children expressions.
-pub type ExprOrdering = ExprContext<SortProperties>;
+/// Represents the properties of a `PhysicalExpr`, including its sorting and 
range attributes.
+#[derive(Debug, Clone)]
+pub struct ExprProperties {
+    pub sort_properties: SortProperties,
+    pub range: Interval,
+}
+
+impl ExprProperties {
+    /// Creates a new `ExprProperties` instance with unknown sort properties 
and unknown range.
+    pub fn new_unknown() -> Self {
+        Self {
+            sort_properties: SortProperties::default(),
+            range: Interval::make_unbounded(&DataType::Null).unwrap(),
+        }
+    }
+
+    /// Sets the sorting properties of the expression and returns the modified 
instance.
+    pub fn with_order(mut self, order: SortProperties) -> Self {
+        self.sort_properties = order;
+        self
+    }
+
+    /// Sets the range of the expression and returns the modified instance.
+    pub fn with_range(mut self, range: Interval) -> Self {
+        self.range = range;
+        self
+    }
+}
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index fadea26e7f..921d13ab35 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -17,19 +17,20 @@
 
 //! [`ScalarUDF`]: Scalar User Defined Functions
 
+use std::any::Any;
+use std::fmt::{self, Debug, Formatter};
+use std::sync::Arc;
+
 use crate::expr::create_name;
+use crate::interval_arithmetic::Interval;
 use crate::simplify::{ExprSimplifyResult, SimplifyInfo};
+use crate::sort_properties::{ExprProperties, SortProperties};
 use crate::{
-    ColumnarValue, Expr, FuncMonotonicity, ReturnTypeFunction,
-    ScalarFunctionImplementation, Signature,
+    ColumnarValue, Expr, ReturnTypeFunction, ScalarFunctionImplementation, 
Signature,
 };
+
 use arrow::datatypes::DataType;
 use datafusion_common::{not_impl_err, ExprSchema, Result};
-use std::any::Any;
-use std::fmt;
-use std::fmt::Debug;
-use std::fmt::Formatter;
-use std::sync::Arc;
 
 /// Logical representation of a Scalar User Defined Function.
 ///
@@ -202,18 +203,63 @@ impl ScalarUDF {
         Arc::new(move |args| captured.invoke(args))
     }
 
-    /// This function specifies monotonicity behaviors for User defined scalar 
functions.
-    ///
-    /// See [`ScalarUDFImpl::monotonicity`] for more details.
-    pub fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-        self.inner.monotonicity()
-    }
-
     /// Get the circuits of inner implementation
     pub fn short_circuits(&self) -> bool {
         self.inner.short_circuits()
     }
 
+    /// Computes the output interval for a [`ScalarUDF`], given the input
+    /// intervals.
+    ///
+    /// # Parameters
+    ///
+    /// * `inputs` are the intervals for the inputs (children) of this 
function.
+    ///
+    /// # Example
+    ///
+    /// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`,
+    /// then the output interval would be `[0, 3]`.
+    pub fn evaluate_bounds(&self, inputs: &[&Interval]) -> Result<Interval> {
+        self.inner.evaluate_bounds(inputs)
+    }
+
+    /// Updates bounds for child expressions, given a known interval for this
+    /// function. This is used to propagate constraints down through an 
expression
+    /// tree.
+    ///
+    /// # Parameters
+    ///
+    /// * `interval` is the currently known interval for this function.
+    /// * `inputs` are the current intervals for the inputs (children) of this 
function.
+    ///
+    /// # Returns
+    ///
+    /// A `Vec` of new intervals for the children, in order.
+    ///
+    /// If constraint propagation reveals an infeasibility for any child, 
returns
+    /// [`None`]. If none of the children intervals change as a result of
+    /// propagation, may return an empty vector instead of cloning `children`.
+    /// This is the default (and conservative) return value.
+    ///
+    /// # Example
+    ///
+    /// If the function is `ABS(a)`, the current `interval` is `[4, 5]` and the
+    /// input `a` is given as `[-7, -6]`, then propagation would would return
+    /// `[-5, 5]`.
+    pub fn propagate_constraints(
+        &self,
+        interval: &Interval,
+        inputs: &[&Interval],
+    ) -> Result<Option<Vec<Interval>>> {
+        self.inner.propagate_constraints(interval, inputs)
+    }
+
+    /// Calculates the [`SortProperties`] of this function based on its
+    /// children's properties.
+    pub fn monotonicity(&self, inputs: &[ExprProperties]) -> 
Result<SortProperties> {
+        self.inner.monotonicity(inputs)
+    }
+
     /// See [`ScalarUDFImpl::coerce_types`] for more details.
     pub fn coerce_types(&self, arg_types: &[DataType]) -> 
Result<Vec<DataType>> {
         self.inner.coerce_types(arg_types)
@@ -387,11 +433,6 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
         &[]
     }
 
-    /// This function specifies monotonicity behaviors for User defined scalar 
functions.
-    fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-        Ok(None)
-    }
-
     /// Optionally apply per-UDF simplification / rewrite rules.
     ///
     /// This can be used to apply function specific simplification rules during
@@ -426,6 +467,59 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
         false
     }
 
+    /// Computes the output interval for a [`ScalarUDFImpl`], given the input
+    /// intervals.
+    ///
+    /// # Parameters
+    ///
+    /// * `children` are the intervals for the children (inputs) of this 
function.
+    ///
+    /// # Example
+    ///
+    /// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`,
+    /// then the output interval would be `[0, 3]`.
+    fn evaluate_bounds(&self, _input: &[&Interval]) -> Result<Interval> {
+        // We cannot assume the input datatype is the same of output type.
+        Interval::make_unbounded(&DataType::Null)
+    }
+
+    /// Updates bounds for child expressions, given a known interval for this
+    /// function. This is used to propagate constraints down through an 
expression
+    /// tree.
+    ///
+    /// # Parameters
+    ///
+    /// * `interval` is the currently known interval for this function.
+    /// * `inputs` are the current intervals for the inputs (children) of this 
function.
+    ///
+    /// # Returns
+    ///
+    /// A `Vec` of new intervals for the children, in order.
+    ///
+    /// If constraint propagation reveals an infeasibility for any child, 
returns
+    /// [`None`]. If none of the children intervals change as a result of
+    /// propagation, may return an empty vector instead of cloning `children`.
+    /// This is the default (and conservative) return value.
+    ///
+    /// # Example
+    ///
+    /// If the function is `ABS(a)`, the current `interval` is `[4, 5]` and the
+    /// input `a` is given as `[-7, -6]`, then propagation would would return
+    /// `[-5, 5]`.
+    fn propagate_constraints(
+        &self,
+        _interval: &Interval,
+        _inputs: &[&Interval],
+    ) -> Result<Option<Vec<Interval>>> {
+        Ok(Some(vec![]))
+    }
+
+    /// Calculates the [`SortProperties`] of this function based on its
+    /// children's properties.
+    fn monotonicity(&self, _inputs: &[ExprProperties]) -> 
Result<SortProperties> {
+        Ok(SortProperties::Unordered)
+    }
+
     /// Coerce arguments of a function call to types that the function can 
evaluate.
     ///
     /// This function is only called if [`ScalarUDFImpl::signature`] returns 
[`crate::TypeSignature::UserDefined`]. Most
diff --git a/datafusion/functions/src/datetime/date_bin.rs 
b/datafusion/functions/src/datetime/date_bin.rs
index da1797cdae..51f5c09a06 100644
--- a/datafusion/functions/src/datetime/date_bin.rs
+++ b/datafusion/functions/src/datetime/date_bin.rs
@@ -29,16 +29,17 @@ use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
 use arrow::datatypes::IntervalUnit::{DayTime, MonthDayNano};
 use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
 use arrow::datatypes::{DataType, TimeUnit};
-use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
 
 use datafusion_common::cast::as_primitive_array;
 use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
 use datafusion_expr::TypeSignature::Exact;
 use datafusion_expr::{
-    ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility,
-    TIMEZONE_WILDCARD,
+    ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
 };
 
+use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
+
 #[derive(Debug)]
 pub struct DateBinFunc {
     signature: Signature,
@@ -146,8 +147,21 @@ impl ScalarUDFImpl for DateBinFunc {
         }
     }
 
-    fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-        Ok(Some(vec![None, Some(true)]))
+    fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties> 
{
+        // The DATE_BIN function preserves the order of its second argument.
+        let step = &input[0];
+        let date_value = &input[1];
+        let reference = input.get(2);
+
+        if step.sort_properties.eq(&SortProperties::Singleton)
+            && reference
+                .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
+                .unwrap_or(true)
+        {
+            Ok(date_value.sort_properties)
+        } else {
+            Ok(SortProperties::Unordered)
+        }
     }
 }
 
@@ -425,16 +439,16 @@ fn date_bin_impl(
 mod tests {
     use std::sync::Arc;
 
+    use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
     use arrow::array::types::TimestampNanosecondType;
     use arrow::array::{IntervalDayTimeArray, TimestampNanosecondArray};
     use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
     use arrow::datatypes::{DataType, TimeUnit};
-    use chrono::TimeDelta;
 
     use datafusion_common::ScalarValue;
     use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
 
-    use crate::datetime::date_bin::{date_bin_nanos_interval, DateBinFunc};
+    use chrono::TimeDelta;
 
     #[test]
     fn test_date_bin() {
diff --git a/datafusion/functions/src/datetime/date_trunc.rs 
b/datafusion/functions/src/datetime/date_trunc.rs
index 0414bf9c2a..ba5db567a0 100644
--- a/datafusion/functions/src/datetime/date_trunc.rs
+++ b/datafusion/functions/src/datetime/date_trunc.rs
@@ -29,19 +29,18 @@ use arrow::array::types::{
     TimestampNanosecondType, TimestampSecondType,
 };
 use arrow::array::{Array, PrimitiveArray};
-use arrow::datatypes::DataType::{Null, Timestamp, Utf8};
-use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
-use arrow::datatypes::{DataType, TimeUnit};
-use chrono::{
-    DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset, 
TimeDelta, Timelike,
-};
-
+use arrow::datatypes::DataType::{self, Null, Timestamp, Utf8};
+use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, 
Second};
 use datafusion_common::cast::as_primitive_array;
 use datafusion_common::{exec_err, plan_err, DataFusionError, Result, 
ScalarValue};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
 use datafusion_expr::TypeSignature::Exact;
 use datafusion_expr::{
-    ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility,
-    TIMEZONE_WILDCARD,
+    ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
+};
+
+use chrono::{
+    DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset, 
TimeDelta, Timelike,
 };
 
 #[derive(Debug)]
@@ -205,8 +204,16 @@ impl ScalarUDFImpl for DateTruncFunc {
         &self.aliases
     }
 
-    fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-        Ok(Some(vec![None, Some(true)]))
+    fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties> 
{
+        // The DATE_TRUNC function preserves the order of its second argument.
+        let precision = &input[0];
+        let date_value = &input[1];
+
+        if precision.sort_properties.eq(&SortProperties::Singleton) {
+            Ok(date_value.sort_properties)
+        } else {
+            Ok(SortProperties::Unordered)
+        }
     }
 }
 
@@ -410,7 +417,10 @@ fn parse_tz(tz: &Option<Arc<str>>) -> Result<Option<Tz>> {
 
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
     use crate::datetime::date_trunc::{date_trunc_coarse, DateTruncFunc};
+
     use arrow::array::cast::as_primitive_array;
     use arrow::array::types::TimestampNanosecondType;
     use arrow::array::TimestampNanosecondArray;
@@ -418,7 +428,6 @@ mod tests {
     use arrow::datatypes::{DataType, TimeUnit};
     use datafusion_common::ScalarValue;
     use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
-    use std::sync::Arc;
 
     #[test]
     fn date_trunc_test() {
diff --git a/datafusion/functions/src/macros.rs 
b/datafusion/functions/src/macros.rs
index 5ee47bd3e8..2f14e881d1 100644
--- a/datafusion/functions/src/macros.rs
+++ b/datafusion/functions/src/macros.rs
@@ -89,7 +89,6 @@ macro_rules! make_udf_function {
 /// The rationale for providing stub functions is to help users to configure 
datafusion
 /// properly (so they get an error telling them why a function is not 
available)
 /// instead of getting a cryptic "no function found" message at runtime.
-
 macro_rules! make_stub_package {
     ($name:ident, $feature:literal) => {
         #[cfg(not(feature = $feature))]
@@ -115,7 +114,6 @@ macro_rules! make_stub_package {
 /// $ARGS_TYPE: the type of array to cast the argument to
 /// $RETURN_TYPE: the type of array to return
 /// $FUNC: the function to apply to each element of $ARG
-///
 macro_rules! make_function_scalar_inputs_return_type {
     ($ARG: expr, $NAME:expr, $ARG_TYPE:ident, $RETURN_TYPE:ident, $FUNC: 
block) => {{
         let arg = downcast_arg!($ARG, $NAME, $ARG_TYPE);
@@ -162,14 +160,14 @@ macro_rules! make_math_unary_udf {
         make_udf_function!($NAME::$UDF, $GNAME, $NAME);
 
         mod $NAME {
+            use std::any::Any;
+            use std::sync::Arc;
+
             use arrow::array::{ArrayRef, Float32Array, Float64Array};
             use arrow::datatypes::DataType;
             use datafusion_common::{exec_err, DataFusionError, Result};
-            use datafusion_expr::{
-                ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, 
Volatility,
-            };
-            use std::any::Any;
-            use std::sync::Arc;
+            use datafusion_expr::sort_properties::{ExprProperties, 
SortProperties};
+            use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, 
Volatility};
 
             #[derive(Debug)]
             pub struct $UDF {
@@ -211,8 +209,11 @@ macro_rules! make_math_unary_udf {
                     }
                 }
 
-                fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-                    Ok($MONOTONICITY)
+                fn monotonicity(
+                    &self,
+                    input: &[ExprProperties],
+                ) -> Result<SortProperties> {
+                    $MONOTONICITY(input)
                 }
 
                 fn invoke(&self, args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
@@ -266,15 +267,15 @@ macro_rules! make_math_binary_udf {
         make_udf_function!($NAME::$UDF, $GNAME, $NAME);
 
         mod $NAME {
+            use std::any::Any;
+            use std::sync::Arc;
+
             use arrow::array::{ArrayRef, Float32Array, Float64Array};
             use arrow::datatypes::DataType;
             use datafusion_common::{exec_err, DataFusionError, Result};
+            use datafusion_expr::sort_properties::{ExprProperties, 
SortProperties};
             use datafusion_expr::TypeSignature::*;
-            use datafusion_expr::{
-                ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, 
Volatility,
-            };
-            use std::any::Any;
-            use std::sync::Arc;
+            use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, 
Volatility};
 
             #[derive(Debug)]
             pub struct $UDF {
@@ -318,8 +319,11 @@ macro_rules! make_math_binary_udf {
                     }
                 }
 
-                fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-                    Ok($MONOTONICITY)
+                fn monotonicity(
+                    &self,
+                    input: &[ExprProperties],
+                ) -> Result<SortProperties> {
+                    $MONOTONICITY(input)
                 }
 
                 fn invoke(&self, args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
diff --git a/datafusion/functions/src/math/abs.rs 
b/datafusion/functions/src/math/abs.rs
index e05dc86652..a752102913 100644
--- a/datafusion/functions/src/math/abs.rs
+++ b/datafusion/functions/src/math/abs.rs
@@ -17,23 +17,20 @@
 
 //! math expressions
 
-use arrow::array::Decimal128Array;
-use arrow::array::Decimal256Array;
-use arrow::array::Int16Array;
-use arrow::array::Int32Array;
-use arrow::array::Int64Array;
-use arrow::array::Int8Array;
-use arrow::datatypes::DataType;
-use datafusion_common::{exec_err, not_impl_err};
-use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::ColumnarValue;
-
-use arrow::array::{ArrayRef, Float32Array, Float64Array};
-use arrow::error::ArrowError;
-use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
 use std::any::Any;
 use std::sync::Arc;
 
+use arrow::array::{
+    ArrayRef, Decimal128Array, Decimal256Array, Float32Array, Float64Array, 
Int16Array,
+    Int32Array, Int64Array, Int8Array,
+};
+use arrow::datatypes::DataType;
+use arrow::error::ArrowError;
+use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+
 type MathArrayFunction = fn(&Vec<ArrayRef>) -> Result<ArrayRef>;
 
 macro_rules! make_abs_function {
@@ -170,7 +167,21 @@ impl ScalarUDFImpl for AbsFunc {
         let input_data_type = args[0].data_type();
         let abs_fun = create_abs_function(input_data_type)?;
 
-        let arr = abs_fun(&args)?;
-        Ok(ColumnarValue::Array(arr))
+        abs_fun(&args).map(ColumnarValue::Array)
+    }
+
+    fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties> 
{
+        // Non-decreasing for x ≥ 0 and symmetrically non-increasing for x ≤ 0.
+        let arg = &input[0];
+        let range = &arg.range;
+        let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+        if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+            Ok(arg.sort_properties)
+        } else if range.lt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+            Ok(-arg.sort_properties)
+        } else {
+            Ok(SortProperties::Unordered)
+        }
     }
 }
diff --git a/datafusion/functions/src/math/log.rs 
b/datafusion/functions/src/math/log.rs
index e6c698ad1a..8c1e8ac8fe 100644
--- a/datafusion/functions/src/math/log.rs
+++ b/datafusion/functions/src/math/log.rs
@@ -17,6 +17,12 @@
 
 //! Math function: `log()`.
 
+use std::any::Any;
+use std::sync::Arc;
+
+use super::power::PowerFunc;
+
+use arrow::array::{ArrayRef, Float32Array, Float64Array};
 use arrow::datatypes::DataType;
 use datafusion_common::{
     exec_err, internal_err, plan_datafusion_err, plan_err, DataFusionError, 
Result,
@@ -24,15 +30,9 @@ use datafusion_common::{
 };
 use datafusion_expr::expr::ScalarFunction;
 use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
-use datafusion_expr::{lit, ColumnarValue, Expr, FuncMonotonicity, ScalarUDF};
-
-use arrow::array::{ArrayRef, Float32Array, Float64Array};
-use datafusion_expr::TypeSignature::*;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_expr::{lit, ColumnarValue, Expr, ScalarUDF, TypeSignature::*};
 use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
-use std::any::Any;
-use std::sync::Arc;
-
-use super::power::PowerFunc;
 
 #[derive(Debug)]
 pub struct LogFunc {
@@ -81,8 +81,23 @@ impl ScalarUDFImpl for LogFunc {
         }
     }
 
-    fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-        Ok(Some(vec![Some(true), Some(false)]))
+    fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties> 
{
+        match (input[0].sort_properties, input[1].sort_properties) {
+            (first @ SortProperties::Ordered(value), 
SortProperties::Ordered(base))
+                if !value.descending && base.descending
+                    || value.descending && !base.descending =>
+            {
+                Ok(first)
+            }
+            (
+                first @ (SortProperties::Ordered(_) | 
SortProperties::Singleton),
+                SortProperties::Singleton,
+            ) => Ok(first),
+            (SortProperties::Singleton, second @ SortProperties::Ordered(_)) 
=> {
+                Ok(-second)
+            }
+            _ => Ok(SortProperties::Unordered),
+        }
     }
 
     // Support overloaded log(base, x) and log(x) which defaults to log(10, x)
@@ -213,14 +228,13 @@ fn is_pow(func: &ScalarUDF) -> bool {
 mod tests {
     use std::collections::HashMap;
 
-    use datafusion_common::{
-        cast::{as_float32_array, as_float64_array},
-        DFSchema,
-    };
-    use datafusion_expr::{execution_props::ExecutionProps, 
simplify::SimplifyContext};
-
     use super::*;
 
+    use datafusion_common::cast::{as_float32_array, as_float64_array};
+    use datafusion_common::DFSchema;
+    use datafusion_expr::execution_props::ExecutionProps;
+    use datafusion_expr::simplify::SimplifyContext;
+
     #[test]
     fn test_log_f64() {
         let args = [
diff --git a/datafusion/functions/src/math/mod.rs 
b/datafusion/functions/src/math/mod.rs
index b6e8d26b64..6c26ce79d0 100644
--- a/datafusion/functions/src/math/mod.rs
+++ b/datafusion/functions/src/math/mod.rs
@@ -17,9 +17,12 @@
 
 //! "math" DataFusion functions
 
-use datafusion_expr::ScalarUDF;
 use std::sync::Arc;
 
+use crate::math::monotonicity::*;
+
+use datafusion_expr::ScalarUDF;
+
 pub mod abs;
 pub mod cot;
 pub mod factorial;
@@ -27,6 +30,7 @@ pub mod gcd;
 pub mod iszero;
 pub mod lcm;
 pub mod log;
+pub mod monotonicity;
 pub mod nans;
 pub mod nanvl;
 pub mod pi;
@@ -37,42 +41,60 @@ pub mod trunc;
 
 // Create UDFs
 make_udf_function!(abs::AbsFunc, ABS, abs);
-make_math_unary_udf!(AcosFunc, ACOS, acos, acos, None);
-make_math_unary_udf!(AcoshFunc, ACOSH, acosh, acosh, Some(vec![Some(true)]));
-make_math_unary_udf!(AsinFunc, ASIN, asin, asin, None);
-make_math_unary_udf!(AsinhFunc, ASINH, asinh, asinh, Some(vec![Some(true)]));
-make_math_unary_udf!(AtanFunc, ATAN, atan, atan, Some(vec![Some(true)]));
-make_math_unary_udf!(AtanhFunc, ATANH, atanh, atanh, Some(vec![Some(true)]));
-make_math_binary_udf!(Atan2, ATAN2, atan2, atan2, Some(vec![Some(true)]));
-make_math_unary_udf!(CbrtFunc, CBRT, cbrt, cbrt, None);
-make_math_unary_udf!(CeilFunc, CEIL, ceil, ceil, Some(vec![Some(true)]));
-make_math_unary_udf!(CosFunc, COS, cos, cos, None);
-make_math_unary_udf!(CoshFunc, COSH, cosh, cosh, None);
+make_math_unary_udf!(AcosFunc, ACOS, acos, acos, super::acos_monotonicity);
+make_math_unary_udf!(AcoshFunc, ACOSH, acosh, acosh, 
super::acosh_monotonicity);
+make_math_unary_udf!(AsinFunc, ASIN, asin, asin, super::asin_monotonicity);
+make_math_unary_udf!(AsinhFunc, ASINH, asinh, asinh, 
super::asinh_monotonicity);
+make_math_unary_udf!(AtanFunc, ATAN, atan, atan, super::atan_monotonicity);
+make_math_unary_udf!(AtanhFunc, ATANH, atanh, atanh, 
super::atanh_monotonicity);
+make_math_binary_udf!(Atan2, ATAN2, atan2, atan2, super::atan2_monotonicity);
+make_math_unary_udf!(CbrtFunc, CBRT, cbrt, cbrt, super::cbrt_monotonicity);
+make_math_unary_udf!(CeilFunc, CEIL, ceil, ceil, super::ceil_monotonicity);
+make_math_unary_udf!(CosFunc, COS, cos, cos, super::cos_monotonicity);
+make_math_unary_udf!(CoshFunc, COSH, cosh, cosh, super::cosh_monotonicity);
 make_udf_function!(cot::CotFunc, COT, cot);
-make_math_unary_udf!(DegreesFunc, DEGREES, degrees, to_degrees, None);
-make_math_unary_udf!(ExpFunc, EXP, exp, exp, Some(vec![Some(true)]));
+make_math_unary_udf!(
+    DegreesFunc,
+    DEGREES,
+    degrees,
+    to_degrees,
+    super::degrees_monotonicity
+);
+make_math_unary_udf!(ExpFunc, EXP, exp, exp, super::exp_monotonicity);
 make_udf_function!(factorial::FactorialFunc, FACTORIAL, factorial);
-make_math_unary_udf!(FloorFunc, FLOOR, floor, floor, Some(vec![Some(true)]));
+make_math_unary_udf!(FloorFunc, FLOOR, floor, floor, 
super::floor_monotonicity);
 make_udf_function!(log::LogFunc, LOG, log);
 make_udf_function!(gcd::GcdFunc, GCD, gcd);
 make_udf_function!(nans::IsNanFunc, ISNAN, isnan);
 make_udf_function!(iszero::IsZeroFunc, ISZERO, iszero);
 make_udf_function!(lcm::LcmFunc, LCM, lcm);
-make_math_unary_udf!(LnFunc, LN, ln, ln, Some(vec![Some(true)]));
-make_math_unary_udf!(Log2Func, LOG2, log2, log2, Some(vec![Some(true)]));
-make_math_unary_udf!(Log10Func, LOG10, log10, log10, Some(vec![Some(true)]));
+make_math_unary_udf!(LnFunc, LN, ln, ln, super::ln_monotonicity);
+make_math_unary_udf!(Log2Func, LOG2, log2, log2, super::log2_monotonicity);
+make_math_unary_udf!(Log10Func, LOG10, log10, log10, 
super::log10_monotonicity);
 make_udf_function!(nanvl::NanvlFunc, NANVL, nanvl);
 make_udf_function!(pi::PiFunc, PI, pi);
 make_udf_function!(power::PowerFunc, POWER, power);
-make_math_unary_udf!(RadiansFunc, RADIANS, radians, to_radians, None);
+make_math_unary_udf!(
+    RadiansFunc,
+    RADIANS,
+    radians,
+    to_radians,
+    super::radians_monotonicity
+);
 make_udf_function!(random::RandomFunc, RANDOM, random);
 make_udf_function!(round::RoundFunc, ROUND, round);
-make_math_unary_udf!(SignumFunc, SIGNUM, signum, signum, None);
-make_math_unary_udf!(SinFunc, SIN, sin, sin, None);
-make_math_unary_udf!(SinhFunc, SINH, sinh, sinh, None);
-make_math_unary_udf!(SqrtFunc, SQRT, sqrt, sqrt, None);
-make_math_unary_udf!(TanFunc, TAN, tan, tan, None);
-make_math_unary_udf!(TanhFunc, TANH, tanh, tanh, None);
+make_math_unary_udf!(
+    SignumFunc,
+    SIGNUM,
+    signum,
+    signum,
+    super::signum_monotonicity
+);
+make_math_unary_udf!(SinFunc, SIN, sin, sin, super::sin_monotonicity);
+make_math_unary_udf!(SinhFunc, SINH, sinh, sinh, super::sinh_monotonicity);
+make_math_unary_udf!(SqrtFunc, SQRT, sqrt, sqrt, super::sqrt_monotonicity);
+make_math_unary_udf!(TanFunc, TAN, tan, tan, super::tan_monotonicity);
+make_math_unary_udf!(TanhFunc, TANH, tanh, tanh, super::tanh_monotonicity);
 make_udf_function!(trunc::TruncFunc, TRUNC, trunc);
 
 pub mod expr_fn {
diff --git a/datafusion/functions/src/math/monotonicity.rs 
b/datafusion/functions/src/math/monotonicity.rs
new file mode 100644
index 0000000000..5ce5654ae7
--- /dev/null
+++ b/datafusion/functions/src/math/monotonicity.rs
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::DataType;
+use datafusion_common::{exec_err, Result, ScalarValue};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+
+fn symmetric_unit_interval(data_type: &DataType) -> Result<Interval> {
+    Interval::try_new(
+        ScalarValue::new_negative_one(data_type)?,
+        ScalarValue::new_one(data_type)?,
+    )
+}
+
+/// Non-increasing on the interval \[−1, 1\], undefined otherwise.
+pub fn acos_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    let arg = &input[0];
+    let range = &arg.range;
+
+    let valid_domain = symmetric_unit_interval(&range.lower().data_type())?;
+
+    if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE {
+        Ok(-arg.sort_properties)
+    } else {
+        exec_err!("Input range of ACOS contains out-of-domain values")
+    }
+}
+
+/// Non-decreasing for x ≥ 1, undefined otherwise.
+pub fn acosh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    let arg = &input[0];
+    let range = &arg.range;
+
+    let valid_domain = Interval::try_new(
+        ScalarValue::new_one(&range.lower().data_type())?,
+        ScalarValue::try_from(&range.upper().data_type())?,
+    )?;
+
+    if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE {
+        Ok(arg.sort_properties)
+    } else {
+        exec_err!("Input range of ACOSH contains out-of-domain values")
+    }
+}
+
+/// Non-decreasing on the interval \[−1, 1\], undefined otherwise.
+pub fn asin_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    let arg = &input[0];
+    let range = &arg.range;
+
+    let valid_domain = symmetric_unit_interval(&range.lower().data_type())?;
+
+    if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE {
+        Ok(arg.sort_properties)
+    } else {
+        exec_err!("Input range of ASIN contains out-of-domain values")
+    }
+}
+
+/// Non-decreasing for all real numbers.
+pub fn asinh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn atan_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing on the interval \[−1, 1\], undefined otherwise.
+pub fn atanh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    let arg = &input[0];
+    let range = &arg.range;
+
+    let valid_domain = symmetric_unit_interval(&range.lower().data_type())?;
+
+    if valid_domain.contains(range)? == Interval::CERTAINLY_TRUE {
+        Ok(arg.sort_properties)
+    } else {
+        exec_err!("Input range of ATANH contains out-of-domain values")
+    }
+}
+
+/// Monotonicity depends on the quadrant.
+// TODO: Implement monotonicity of the ATAN2 function.
+pub fn atan2_monotonicity(_input: &[ExprProperties]) -> Result<SortProperties> 
{
+    Ok(SortProperties::Unordered)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn cbrt_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn ceil_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
+
+/// Non-increasing on \[0, π\] and then non-decreasing on \[π, 2π\].
+/// This pattern repeats periodically with a period of 2π.
+// TODO: Implement monotonicity of the ATAN2 function.
+pub fn cos_monotonicity(_input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(SortProperties::Unordered)
+}
+
+/// Non-decreasing for x ≥ 0 and symmetrically non-increasing for x ≤ 0.
+pub fn cosh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    let arg = &input[0];
+    let range = &arg.range;
+
+    let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+    if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+        Ok(arg.sort_properties)
+    } else if range.lt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+        Ok(-arg.sort_properties)
+    } else {
+        Ok(SortProperties::Unordered)
+    }
+}
+
+/// Non-decreasing function that converts radians to degrees.
+pub fn degrees_monotonicity(input: &[ExprProperties]) -> 
Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn exp_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn floor_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for x ≥ 0, undefined otherwise.
+pub fn ln_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    let arg = &input[0];
+    let range = &arg.range;
+
+    let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+    if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+        Ok(arg.sort_properties)
+    } else {
+        exec_err!("Input range of LN contains out-of-domain values")
+    }
+}
+
+/// Non-decreasing for x ≥ 0, undefined otherwise.
+pub fn log2_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    let arg = &input[0];
+    let range = &arg.range;
+
+    let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+    if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+        Ok(arg.sort_properties)
+    } else {
+        exec_err!("Input range of LOG2 contains out-of-domain values")
+    }
+}
+
+/// Non-decreasing for x ≥ 0, undefined otherwise.
+pub fn log10_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    let arg = &input[0];
+    let range = &arg.range;
+
+    let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+    if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+        Ok(arg.sort_properties)
+    } else {
+        exec_err!("Input range of LOG10 contains out-of-domain values")
+    }
+}
+
+/// Non-decreasing for all real numbers x.
+pub fn radians_monotonicity(input: &[ExprProperties]) -> 
Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for all real numbers x.
+pub fn signum_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> 
{
+    Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing on \[0, π\] and then non-increasing on \[π, 2π\].
+/// This pattern repeats periodically with a period of 2π.
+// TODO: Implement monotonicity of the SIN function.
+pub fn sin_monotonicity(_input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(SortProperties::Unordered)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn sinh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
+
+/// Non-decreasing for x ≥ 0, undefined otherwise.
+pub fn sqrt_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    let arg = &input[0];
+    let range = &arg.range;
+
+    let zero_point = Interval::make_zero(&range.lower().data_type())?;
+
+    if range.gt_eq(&zero_point)? == Interval::CERTAINLY_TRUE {
+        Ok(arg.sort_properties)
+    } else {
+        exec_err!("Input range of SQRT contains out-of-domain values")
+    }
+}
+
+/// Non-decreasing between vertical asymptotes at x = k * π ± π / 2 for any
+/// integer k.
+// TODO: Implement monotonicity of the TAN function.
+pub fn tan_monotonicity(_input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(SortProperties::Unordered)
+}
+
+/// Non-decreasing for all real numbers.
+pub fn tanh_monotonicity(input: &[ExprProperties]) -> Result<SortProperties> {
+    Ok(input[0].sort_properties)
+}
diff --git a/datafusion/functions/src/math/pi.rs 
b/datafusion/functions/src/math/pi.rs
index f9403e411f..60c94b6ca6 100644
--- a/datafusion/functions/src/math/pi.rs
+++ b/datafusion/functions/src/math/pi.rs
@@ -19,10 +19,9 @@ use std::any::Any;
 
 use arrow::datatypes::DataType;
 use arrow::datatypes::DataType::Float64;
-
 use datafusion_common::{not_impl_err, Result, ScalarValue};
-use datafusion_expr::{ColumnarValue, FuncMonotonicity, Volatility};
-use datafusion_expr::{ScalarUDFImpl, Signature};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
 
 #[derive(Debug)]
 pub struct PiFunc {
@@ -70,7 +69,8 @@ impl ScalarUDFImpl for PiFunc {
         ))))
     }
 
-    fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-        Ok(Some(vec![Some(true)]))
+    fn monotonicity(&self, _input: &[ExprProperties]) -> 
Result<SortProperties> {
+        // This function returns a constant value.
+        Ok(SortProperties::Singleton)
     }
 }
diff --git a/datafusion/functions/src/math/round.rs 
b/datafusion/functions/src/math/round.rs
index f4a163137a..600f4fd547 100644
--- a/datafusion/functions/src/math/round.rs
+++ b/datafusion/functions/src/math/round.rs
@@ -18,15 +18,15 @@
 use std::any::Any;
 use std::sync::Arc;
 
+use crate::utils::make_scalar_function;
+
 use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array};
 use arrow::datatypes::DataType;
 use arrow::datatypes::DataType::{Float32, Float64};
-
-use crate::utils::make_scalar_function;
 use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
 use datafusion_expr::TypeSignature::Exact;
-use datafusion_expr::{ColumnarValue, FuncMonotonicity};
-use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
 
 #[derive(Debug)]
 pub struct RoundFunc {
@@ -80,8 +80,19 @@ impl ScalarUDFImpl for RoundFunc {
         make_scalar_function(round, vec![])(args)
     }
 
-    fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-        Ok(Some(vec![Some(true)]))
+    fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties> 
{
+        // round preserves the order of the first argument
+        let value = &input[0];
+        let precision = input.get(1);
+
+        if precision
+            .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
+            .unwrap_or(true)
+        {
+            Ok(value.sort_properties)
+        } else {
+            Ok(SortProperties::Unordered)
+        }
     }
 }
 
@@ -179,10 +190,12 @@ pub fn round(args: &[ArrayRef]) -> Result<ArrayRef> {
 
 #[cfg(test)]
 mod test {
+    use std::sync::Arc;
+
     use crate::math::round::round;
+
     use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array};
     use datafusion_common::cast::{as_float32_array, as_float64_array};
-    use std::sync::Arc;
 
     #[test]
     fn test_round_f32() {
diff --git a/datafusion/functions/src/math/trunc.rs 
b/datafusion/functions/src/math/trunc.rs
index 6f88099889..0c4d38564b 100644
--- a/datafusion/functions/src/math/trunc.rs
+++ b/datafusion/functions/src/math/trunc.rs
@@ -18,16 +18,16 @@
 use std::any::Any;
 use std::sync::Arc;
 
+use crate::utils::make_scalar_function;
+
 use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array};
 use arrow::datatypes::DataType;
 use arrow::datatypes::DataType::{Float32, Float64};
-
-use crate::utils::make_scalar_function;
 use datafusion_common::ScalarValue::Int64;
 use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
 use datafusion_expr::TypeSignature::Exact;
-use datafusion_expr::{ColumnarValue, FuncMonotonicity};
-use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
 
 #[derive(Debug)]
 pub struct TruncFunc {
@@ -86,8 +86,19 @@ impl ScalarUDFImpl for TruncFunc {
         make_scalar_function(trunc, vec![])(args)
     }
 
-    fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-        Ok(Some(vec![Some(true)]))
+    fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties> 
{
+        // trunc preserves the order of the first argument
+        let value = &input[0];
+        let precision = input.get(1);
+
+        if precision
+            .map(|r| r.sort_properties.eq(&SortProperties::Singleton))
+            .unwrap_or(true)
+        {
+            Ok(value.sort_properties)
+        } else {
+            Ok(SortProperties::Unordered)
+        }
     }
 }
 
@@ -156,10 +167,12 @@ fn compute_truncate64(x: f64, y: i64) -> f64 {
 
 #[cfg(test)]
 mod test {
+    use std::sync::Arc;
+
     use crate::math::trunc::trunc;
+
     use arrow::array::{ArrayRef, Float32Array, Float64Array, Int64Array};
     use datafusion_common::cast::{as_float32_array, as_float64_array};
-    use std::sync::Arc;
 
     #[test]
     fn test_truncate_32() {
diff --git a/datafusion/physical-expr-common/src/lib.rs 
b/datafusion/physical-expr-common/src/lib.rs
index 53e3134a1b..f335958698 100644
--- a/datafusion/physical-expr-common/src/lib.rs
+++ b/datafusion/physical-expr-common/src/lib.rs
@@ -19,6 +19,5 @@ pub mod aggregate;
 pub mod expressions;
 pub mod physical_expr;
 pub mod sort_expr;
-pub mod sort_properties;
 pub mod tree_node;
 pub mod utils;
diff --git a/datafusion/physical-expr-common/src/physical_expr.rs 
b/datafusion/physical-expr-common/src/physical_expr.rs
index a0f8bdf103..00b3dd725d 100644
--- a/datafusion/physical-expr-common/src/physical_expr.rs
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -20,17 +20,17 @@ use std::fmt::{Debug, Display};
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
+use crate::utils::scatter;
+
 use arrow::array::BooleanArray;
 use arrow::compute::filter_record_batch;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
 use datafusion_common::{internal_err, not_impl_err, Result};
 use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::ExprProperties;
 use datafusion_expr::ColumnarValue;
 
-use crate::sort_properties::SortProperties;
-use crate::utils::scatter;
-
 /// See 
[create_physical_expr](https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html)
 /// for examples of creating `PhysicalExpr` from `Expr`
 pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
@@ -154,17 +154,13 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + 
PartialEq<dyn Any> {
     /// directly because it must remain object safe.
     fn dyn_hash(&self, _state: &mut dyn Hasher);
 
-    /// The order information of a PhysicalExpr can be estimated from its 
children.
-    /// This is especially helpful for projection expressions. If we can 
ensure that the
-    /// order of a PhysicalExpr to project matches with the order of SortExec, 
we can
-    /// eliminate that SortExecs.
-    ///
-    /// By recursively calling this function, we can obtain the overall order
-    /// information of the PhysicalExpr. Since `SortOptions` cannot fully 
handle
-    /// the propagation of unordered columns and literals, the `SortProperties`
-    /// struct is used.
-    fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
-        SortProperties::Unordered
+    /// Calculates the properties of this [`PhysicalExpr`] based on its
+    /// children's properties (i.e. order and range), recursively aggregating
+    /// the information from its children. In cases where the [`PhysicalExpr`]
+    /// has no children (e.g., `Literal` or `Column`), these properties should
+    /// be specified externally, as the function defaults to unknown 
properties.
+    fn get_properties(&self, _children: &[ExprProperties]) -> 
Result<ExprProperties> {
+        Ok(ExprProperties::new_unknown())
     }
 }
 
diff --git a/datafusion/physical-expr-common/src/utils.rs 
b/datafusion/physical-expr-common/src/utils.rs
index 459b5a4849..601d344e4a 100644
--- a/datafusion/physical-expr-common/src/utils.rs
+++ b/datafusion/physical-expr-common/src/utils.rs
@@ -15,13 +15,34 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::{
-    array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData},
-    compute::{and_kleene, is_not_null, SlicesIterator},
+use std::sync::Arc;
+
+use crate::{
+    physical_expr::PhysicalExpr, sort_expr::PhysicalSortExpr, 
tree_node::ExprContext,
 };
-use datafusion_common::Result;
 
-use crate::sort_expr::PhysicalSortExpr;
+use arrow::array::{make_array, Array, ArrayRef, BooleanArray, 
MutableArrayData};
+use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
+use datafusion_common::Result;
+use datafusion_expr::sort_properties::ExprProperties;
+
+/// Represents a [`PhysicalExpr`] node with associated properties (order and
+/// range) in a context where properties are tracked.
+pub type ExprPropertiesNode = ExprContext<ExprProperties>;
+
+impl ExprPropertiesNode {
+    /// Constructs a new `ExprPropertiesNode` with unknown properties for a
+    /// given physical expression. This node initializes with default 
properties
+    /// and recursively applies this to all child expressions.
+    pub fn new_unknown(expr: Arc<dyn PhysicalExpr>) -> Self {
+        let children = 
expr.children().into_iter().map(Self::new_unknown).collect();
+        Self {
+            expr,
+            data: ExprProperties::new_unknown(),
+            children,
+        }
+    }
+}
 
 /// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, 
next values of `truthy`
 /// are taken, when the mask evaluates `false` values null values are filled.
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs 
b/datafusion/physical-expr/src/equivalence/mod.rs
index 3ce641c5aa..7faf2caae0 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -70,7 +70,6 @@ pub fn add_offset_to_expr(
 
 #[cfg(test)]
 mod tests {
-
     use super::*;
     use crate::expressions::col;
     use crate::PhysicalSortExpr;
@@ -147,7 +146,7 @@ mod tests {
         let col_f = &col("f", &test_schema)?;
         let col_g = &col("g", &test_schema)?;
         let mut eq_properties = 
EquivalenceProperties::new(test_schema.clone());
-        eq_properties.add_equal_conditions(col_a, col_c);
+        eq_properties.add_equal_conditions(col_a, col_c)?;
 
         let option_asc = SortOptions {
             descending: false,
@@ -204,7 +203,7 @@ mod tests {
 
         let mut eq_properties = 
EquivalenceProperties::new(test_schema.clone());
         // Define a and f are aliases
-        eq_properties.add_equal_conditions(col_a, col_f);
+        eq_properties.add_equal_conditions(col_a, col_f)?;
         // Column e has constant value.
         eq_properties = eq_properties.add_constants([col_e.clone()]);
 
@@ -338,11 +337,11 @@ mod tests {
         let col_y_expr = Arc::new(Column::new("y", 4)) as Arc<dyn 
PhysicalExpr>;
 
         // a and b are aliases
-        eq_properties.add_equal_conditions(&col_a_expr, &col_b_expr);
+        eq_properties.add_equal_conditions(&col_a_expr, &col_b_expr)?;
         assert_eq!(eq_properties.eq_group().len(), 1);
 
         // This new entry is redundant, size shouldn't increase
-        eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr);
+        eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr)?;
         assert_eq!(eq_properties.eq_group().len(), 1);
         let eq_groups = &eq_properties.eq_group().classes[0];
         assert_eq!(eq_groups.len(), 2);
@@ -351,7 +350,7 @@ mod tests {
 
         // b and c are aliases. Exising equivalence class should expand,
         // however there shouldn't be any new equivalence class
-        eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr);
+        eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr)?;
         assert_eq!(eq_properties.eq_group().len(), 1);
         let eq_groups = &eq_properties.eq_group().classes[0];
         assert_eq!(eq_groups.len(), 3);
@@ -360,12 +359,12 @@ mod tests {
         assert!(eq_groups.contains(&col_c_expr));
 
         // This is a new set of equality. Hence equivalent class count should 
be 2.
-        eq_properties.add_equal_conditions(&col_x_expr, &col_y_expr);
+        eq_properties.add_equal_conditions(&col_x_expr, &col_y_expr)?;
         assert_eq!(eq_properties.eq_group().len(), 2);
 
         // This equality bridges distinct equality sets.
         // Hence equivalent class count should decrease from 2 to 1.
-        eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr);
+        eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr)?;
         assert_eq!(eq_properties.eq_group().len(), 1);
         let eq_groups = &eq_properties.eq_group().classes[0];
         assert_eq!(eq_groups.len(), 5);
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs 
b/datafusion/physical-expr/src/equivalence/ordering.rs
index ed4600f2d9..7857d9df72 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -223,26 +223,26 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx: 
usize, pre_idx: usize) ->
 mod tests {
     use std::sync::Arc;
 
-    use arrow::datatypes::{DataType, Field, Schema};
-    use arrow_schema::SortOptions;
-    use itertools::Itertools;
-
-    use datafusion_common::{DFSchema, Result};
-    use datafusion_expr::{Operator, ScalarUDF};
-
     use crate::equivalence::tests::{
         convert_to_orderings, convert_to_sort_exprs, create_random_schema,
-        create_test_params, generate_table_for_eq_properties, 
is_table_same_after_sort,
+        create_test_params, create_test_schema, 
generate_table_for_eq_properties,
+        is_table_same_after_sort,
     };
-    use crate::equivalence::{tests::create_test_schema, EquivalenceProperties};
     use crate::equivalence::{
-        EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass,
+        EquivalenceClass, EquivalenceGroup, EquivalenceProperties,
+        OrderingEquivalenceClass,
     };
-    use crate::expressions::Column;
-    use crate::expressions::{col, BinaryExpr};
+    use crate::expressions::{col, BinaryExpr, Column};
     use crate::utils::tests::TestScalarUDF;
     use crate::{PhysicalExpr, PhysicalSortExpr};
 
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow_schema::SortOptions;
+    use datafusion_common::{DFSchema, Result};
+    use datafusion_expr::{Operator, ScalarUDF};
+
+    use itertools::Itertools;
+
     #[test]
     fn test_ordering_satisfy() -> Result<()> {
         let input_schema = Arc::new(Schema::new(vec![
@@ -883,7 +883,7 @@ mod tests {
         };
         // a=c (e.g they are aliases).
         let mut eq_properties = EquivalenceProperties::new(test_schema);
-        eq_properties.add_equal_conditions(col_a, col_c);
+        eq_properties.add_equal_conditions(col_a, col_c)?;
 
         let orderings = vec![
             vec![(col_a, options)],
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs 
b/datafusion/physical-expr/src/equivalence/projection.rs
index 260610f23d..b5ac149d8b 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -17,14 +17,13 @@
 
 use std::sync::Arc;
 
-use arrow::datatypes::SchemaRef;
+use crate::expressions::Column;
+use crate::PhysicalExpr;
 
+use arrow::datatypes::SchemaRef;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{internal_err, Result};
 
-use crate::expressions::Column;
-use crate::PhysicalExpr;
-
 /// Stores the mapping between source expressions and target expressions for a
 /// projection.
 #[derive(Debug, Clone)]
@@ -114,14 +113,7 @@ impl ProjectionMapping {
 
 #[cfg(test)]
 mod tests {
-
-    use arrow::datatypes::{DataType, Field, Schema};
-    use arrow_schema::{SortOptions, TimeUnit};
-    use itertools::Itertools;
-
-    use datafusion_common::DFSchema;
-    use datafusion_expr::{Operator, ScalarUDF};
-
+    use super::*;
     use crate::equivalence::tests::{
         apply_projection, convert_to_orderings, convert_to_orderings_owned,
         create_random_schema, generate_table_for_eq_properties, 
is_table_same_after_sort,
@@ -133,7 +125,12 @@ mod tests {
     use crate::utils::tests::TestScalarUDF;
     use crate::PhysicalSortExpr;
 
-    use super::*;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow_schema::{SortOptions, TimeUnit};
+    use datafusion_common::DFSchema;
+    use datafusion_expr::{Operator, ScalarUDF};
+
+    use itertools::Itertools;
 
     #[test]
     fn project_orderings() -> Result<()> {
@@ -941,7 +938,7 @@ mod tests {
         for (orderings, equal_columns, expected) in test_cases {
             let mut eq_properties = EquivalenceProperties::new(schema.clone());
             for (lhs, rhs) in equal_columns {
-                eq_properties.add_equal_conditions(lhs, rhs);
+                eq_properties.add_equal_conditions(lhs, rhs)?;
             }
 
             let orderings = convert_to_orderings(&orderings);
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index c654208208..016c4c4ae1 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -18,25 +18,27 @@
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
-use arrow_schema::{SchemaRef, SortOptions};
-use indexmap::{IndexMap, IndexSet};
-use itertools::Itertools;
-
-use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{JoinSide, JoinType, Result};
-
+use super::ordering::collapse_lex_ordering;
 use crate::equivalence::{
     collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, 
ProjectionMapping,
 };
 use crate::expressions::{CastExpr, Literal};
-use crate::sort_properties::{ExprOrdering, SortProperties};
 use crate::{
     physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement,
     LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
     PhysicalSortRequirement,
 };
 
-use super::ordering::collapse_lex_ordering;
+use arrow_schema::{SchemaRef, SortOptions};
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{JoinSide, JoinType, Result};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_physical_expr_common::expressions::column::Column;
+use datafusion_physical_expr_common::utils::ExprPropertiesNode;
+
+use indexmap::{IndexMap, IndexSet};
+use itertools::Itertools;
 
 /// A `EquivalenceProperties` object stores useful information related to a 
schema.
 /// Currently, it keeps track of:
@@ -197,7 +199,7 @@ impl EquivalenceProperties {
         &mut self,
         left: &Arc<dyn PhysicalExpr>,
         right: &Arc<dyn PhysicalExpr>,
-    ) {
+    ) -> Result<()> {
         // Discover new constants in light of new the equality:
         if self.is_expr_constant(left) {
             // Left expression is constant, add right as constant
@@ -216,27 +218,34 @@ impl EquivalenceProperties {
         let mut new_orderings = vec![];
         for ordering in self.normalized_oeq_class().iter() {
             let expressions = if left.eq(&ordering[0].expr) {
-                // left expression is leading ordering
+                // Left expression is leading ordering
                 Some((ordering[0].options, right))
             } else if right.eq(&ordering[0].expr) {
-                // right expression is leading ordering
+                // Right expression is leading ordering
                 Some((ordering[0].options, left))
             } else {
                 None
             };
             if let Some((leading_ordering, other_expr)) = expressions {
-                // Only handle expressions with exactly one child
-                // TODO: it should be possible to handle expressions orderings 
f(a, b, c), a, b, c
-                // if f is monotonic in all arguments
-                // First Expression after leading ordering
+                // Currently, we only handle expressions with a single child.
+                // TODO: It should be possible to handle expressions orderings 
like
+                //       f(a, b, c), a, b, c if f is monotonic in all 
arguments.
+                // First expression after leading ordering
                 if let Some(next_expr) = ordering.get(1) {
                     let children = other_expr.children();
                     if children.len() == 1
                         && children[0].eq(&next_expr.expr)
                         && SortProperties::Ordered(leading_ordering)
-                            == 
other_expr.get_ordering(&[SortProperties::Ordered(
-                                next_expr.options,
-                            )])
+                            == other_expr
+                                .get_properties(&[ExprProperties {
+                                    sort_properties: SortProperties::Ordered(
+                                        leading_ordering,
+                                    ),
+                                    range: Interval::make_unbounded(
+                                        &other_expr.data_type(&self.schema)?,
+                                    )?,
+                                }])?
+                                .sort_properties
                     {
                         // Assume existing ordering is [a ASC, b ASC]
                         // When equality a = f(b) is given, If we know that 
given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid,
@@ -254,6 +263,7 @@ impl EquivalenceProperties {
 
         // Add equal expressions to the state
         self.eq_group.add_equal_conditions(left, right);
+        Ok(())
     }
 
     /// Track/register physical expressions with constant values.
@@ -378,11 +388,15 @@ impl EquivalenceProperties {
     ///
     /// Returns `true` if the specified ordering is satisfied, `false` 
otherwise.
     fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
-        let expr_ordering = self.get_expr_ordering(req.expr.clone());
-        let ExprOrdering { expr, data, .. } = expr_ordering;
-        match data {
+        let ExprProperties {
+            sort_properties, ..
+        } = self.get_expr_properties(req.expr.clone());
+        match sort_properties {
             SortProperties::Ordered(options) => {
-                let sort_expr = PhysicalSortExpr { expr, options };
+                let sort_expr = PhysicalSortExpr {
+                    expr: req.expr.clone(),
+                    options,
+                };
                 sort_expr.satisfy(req, self.schema())
             }
             // Singleton expressions satisfies any ordering.
@@ -698,8 +712,9 @@ impl EquivalenceProperties {
             referred_dependencies(&dependency_map, source)
                 .into_iter()
                 .filter_map(|relevant_deps| {
-                    if let SortProperties::Ordered(options) =
-                        get_expr_ordering(source, &relevant_deps)
+                    if let Ok(SortProperties::Ordered(options)) =
+                        get_expr_properties(source, &relevant_deps, 
&self.schema)
+                            .map(|prop| prop.sort_properties)
                     {
                         Some((options, relevant_deps))
                     } else {
@@ -837,16 +852,27 @@ impl EquivalenceProperties {
             let ordered_exprs = search_indices
                 .iter()
                 .flat_map(|&idx| {
-                    let ExprOrdering { expr, data, .. } =
-                        eq_properties.get_expr_ordering(exprs[idx].clone());
-                    match data {
-                        SortProperties::Ordered(options) => {
-                            Some((PhysicalSortExpr { expr, options }, idx))
-                        }
+                    let ExprProperties {
+                        sort_properties, ..
+                    } = eq_properties.get_expr_properties(exprs[idx].clone());
+                    match sort_properties {
+                        SortProperties::Ordered(options) => Some((
+                            PhysicalSortExpr {
+                                expr: exprs[idx].clone(),
+                                options,
+                            },
+                            idx,
+                        )),
                         SortProperties::Singleton => {
                             // Assign default ordering to constant expressions
                             let options = SortOptions::default();
-                            Some((PhysicalSortExpr { expr, options }, idx))
+                            Some((
+                                PhysicalSortExpr {
+                                    expr: exprs[idx].clone(),
+                                    options,
+                                },
+                                idx,
+                            ))
                         }
                         SortProperties::Unordered => None,
                     }
@@ -895,32 +921,33 @@ impl EquivalenceProperties {
         is_constant_recurse(&normalized_constants, &normalized_expr)
     }
 
-    /// Retrieves the ordering information for a given physical expression.
+    /// Retrieves the properties for a given physical expression.
     ///
-    /// This function constructs an `ExprOrdering` object for the provided
+    /// This function constructs an [`ExprProperties`] object for the given
     /// expression, which encapsulates information about the expression's
-    /// ordering, including its [`SortProperties`].
+    /// properties, including its [`SortProperties`] and [`Interval`].
     ///
-    /// # Arguments
+    /// # Parameters
     ///
     /// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical 
expression
     ///   for which ordering information is sought.
     ///
     /// # Returns
     ///
-    /// Returns an `ExprOrdering` object containing the ordering information 
for
-    /// the given expression.
-    pub fn get_expr_ordering(&self, expr: Arc<dyn PhysicalExpr>) -> 
ExprOrdering {
-        ExprOrdering::new_default(expr.clone())
-            .transform_up(|expr| Ok(update_ordering(expr, self)))
+    /// Returns an [`ExprProperties`] object containing the ordering and range
+    /// information for the given expression.
+    pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> 
ExprProperties {
+        ExprPropertiesNode::new_unknown(expr)
+            .transform_up(|expr| update_properties(expr, self))
             .data()
-            // Guaranteed to always return `Ok`.
-            .unwrap()
+            .map(|node| node.data)
+            .unwrap_or(ExprProperties::new_unknown())
     }
 }
 
-/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node.
-/// The node can either be a leaf node, or an intermediate node:
+/// Calculates the properties of a given [`ExprPropertiesNode`].
+///
+/// Order information can be retrieved as:
 /// - If it is a leaf node, we directly find the order of the node by looking
 /// at the given sort expression and equivalence properties if it is a `Column`
 /// leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
@@ -931,30 +958,41 @@ impl EquivalenceProperties {
 /// node directly matches with the sort expression. If there is a match, the
 /// sort expression emerges at that node immediately, discarding the recursive
 /// result coming from its children.
-fn update_ordering(
-    mut node: ExprOrdering,
+///
+/// Range information is calculated as:
+/// - If it is a `Literal` node, we set the range as a point value. If it is a
+/// `Column` node, we set the datatype of the range, but cannot give an 
interval
+/// for the range, yet.
+/// - If it is an intermediate node, the children states matter. Each 
`PhysicalExpr`
+/// and operator has its own rules on how to propagate the children range.
+fn update_properties(
+    mut node: ExprPropertiesNode,
     eq_properties: &EquivalenceProperties,
-) -> Transformed<ExprOrdering> {
-    // We have a Column, which is one of the two possible leaf node types:
+) -> Result<Transformed<ExprPropertiesNode>> {
+    // First, try to gather the information from the children:
+    if !node.expr.children().is_empty() {
+        // We have an intermediate (non-leaf) node, account for its children:
+        let children_props = node.children.iter().map(|c| 
c.data.clone()).collect_vec();
+        node.data = node.expr.get_properties(&children_props)?;
+    } else if node.expr.as_any().is::<Literal>() {
+        // We have a Literal, which is one of the two possible leaf node types:
+        node.data = node.expr.get_properties(&[])?;
+    } else if node.expr.as_any().is::<Column>() {
+        // We have a Column, which is the other possible leaf node type:
+        node.data.range =
+            
Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
+    }
+    // Now, check what we know about orderings:
     let normalized_expr = 
eq_properties.eq_group.normalize_expr(node.expr.clone());
     if eq_properties.is_expr_constant(&normalized_expr) {
-        node.data = SortProperties::Singleton;
+        node.data.sort_properties = SortProperties::Singleton;
     } else if let Some(options) = eq_properties
         .normalized_oeq_class()
         .get_options(&normalized_expr)
     {
-        node.data = SortProperties::Ordered(options);
-    } else if !node.expr.children().is_empty() {
-        // We have an intermediate (non-leaf) node, account for its children:
-        let children_orderings = node.children.iter().map(|c| 
c.data).collect_vec();
-        node.data = node.expr.get_ordering(&children_orderings);
-    } else if node.expr.as_any().is::<Literal>() {
-        // We have a Literal, which is the other possible leaf node type:
-        node.data = node.expr.get_ordering(&[]);
-    } else {
-        return Transformed::no(node);
+        node.data.sort_properties = SortProperties::Ordered(options);
     }
-    Transformed::yes(node)
+    Ok(Transformed::yes(node))
 }
 
 /// This function determines whether the provided expression is constant
@@ -1124,8 +1162,9 @@ fn generate_dependency_orderings(
         .collect()
 }
 
-/// This function examines the given expression and the sort expressions it
-/// refers to determine the ordering properties of the expression.
+/// This function examines the given expression and its properties to determine
+/// the ordering properties of the expression. The range knowledge is not 
utilized
+/// yet in the scope of this function.
 ///
 /// # Parameters
 ///
@@ -1133,26 +1172,41 @@ fn generate_dependency_orderings(
 ///   which ordering properties need to be determined.
 /// - `dependencies`: A reference to `Dependencies`, containing sort 
expressions
 ///   referred to by `expr`.
+/// - `schema``: A reference to the schema which the `expr` columns refer.
 ///
 /// # Returns
 ///
 /// A `SortProperties` indicating the ordering information of the given 
expression.
-fn get_expr_ordering(
+fn get_expr_properties(
     expr: &Arc<dyn PhysicalExpr>,
     dependencies: &Dependencies,
-) -> SortProperties {
+    schema: &SchemaRef,
+) -> Result<ExprProperties> {
     if let Some(column_order) = dependencies.iter().find(|&order| 
expr.eq(&order.expr)) {
         // If exact match is found, return its ordering.
-        SortProperties::Ordered(column_order.options)
+        Ok(ExprProperties {
+            sort_properties: SortProperties::Ordered(column_order.options),
+            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
+        })
+    } else if expr.as_any().downcast_ref::<Column>().is_some() {
+        Ok(ExprProperties {
+            sort_properties: SortProperties::Unordered,
+            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
+        })
+    } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
+        Ok(ExprProperties {
+            sort_properties: SortProperties::Singleton,
+            range: Interval::try_new(literal.value().clone(), 
literal.value().clone())?,
+        })
     } else {
         // Find orderings of its children
         let child_states = expr
             .children()
             .iter()
-            .map(|child| get_expr_ordering(child, dependencies))
-            .collect::<Vec<_>>();
+            .map(|child| get_expr_properties(child, dependencies, schema))
+            .collect::<Result<Vec<_>>>()?;
         // Calculate expression ordering using ordering of its children.
-        expr.get_ordering(&child_states)
+        expr.get_properties(&child_states)
     }
 }
 
@@ -1351,12 +1405,7 @@ impl Hash for ExprWrapper {
 mod tests {
     use std::ops::Not;
 
-    use arrow::datatypes::{DataType, Field, Schema};
-    use arrow_schema::{Fields, TimeUnit};
-
-    use datafusion_common::DFSchema;
-    use datafusion_expr::{Operator, ScalarUDF};
-
+    use super::*;
     use crate::equivalence::add_offset_to_expr;
     use crate::equivalence::tests::{
         convert_to_orderings, convert_to_sort_exprs, convert_to_sort_reqs,
@@ -1366,7 +1415,10 @@ mod tests {
     use crate::expressions::{col, BinaryExpr, Column};
     use crate::utils::tests::TestScalarUDF;
 
-    use super::*;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow_schema::{Fields, TimeUnit};
+    use datafusion_common::DFSchema;
+    use datafusion_expr::{Operator, ScalarUDF};
 
     #[test]
     fn project_equivalence_properties_test() -> Result<()> {
@@ -1577,8 +1629,8 @@ mod tests {
 
         let mut join_eq_properties = 
EquivalenceProperties::new(Arc::new(schema));
         // a=x and d=w
-        join_eq_properties.add_equal_conditions(col_a, col_x);
-        join_eq_properties.add_equal_conditions(col_d, col_w);
+        join_eq_properties.add_equal_conditions(col_a, col_x)?;
+        join_eq_properties.add_equal_conditions(col_d, col_w)?;
 
         updated_right_ordering_equivalence_class(
             &mut right_oeq_class,
@@ -1615,7 +1667,7 @@ mod tests {
         let col_c_expr = col("c", &schema)?;
         let mut eq_properties = 
EquivalenceProperties::new(Arc::new(schema.clone()));
 
-        eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr);
+        eq_properties.add_equal_conditions(&col_a_expr, &col_c_expr)?;
         let others = vec![
             vec![PhysicalSortExpr {
                 expr: col_b_expr.clone(),
@@ -1760,7 +1812,7 @@ mod tests {
     }
 
     #[test]
-    fn test_update_ordering() -> Result<()> {
+    fn test_update_properties() -> Result<()> {
         let schema = Schema::new(vec![
             Field::new("a", DataType::Int32, true),
             Field::new("b", DataType::Int32, true),
@@ -1778,7 +1830,7 @@ mod tests {
             nulls_first: false,
         };
         // b=a (e.g they are aliases)
-        eq_properties.add_equal_conditions(col_b, col_a);
+        eq_properties.add_equal_conditions(col_b, col_a)?;
         // [b ASC], [d ASC]
         eq_properties.add_new_orderings(vec![
             vec![PhysicalSortExpr {
@@ -1821,12 +1873,12 @@ mod tests {
                 .iter()
                 .flat_map(|ordering| ordering.first().cloned())
                 .collect::<Vec<_>>();
-            let expr_ordering = eq_properties.get_expr_ordering(expr.clone());
+            let expr_props = eq_properties.get_expr_properties(expr.clone());
             let err_msg = format!(
                 "expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: 
{leading_orderings:?}",
-                expr, expected, expr_ordering.data
+                expr, expected, expr_props.sort_properties
             );
-            assert_eq!(expr_ordering.data, expected, "{}", err_msg);
+            assert_eq!(expr_props.sort_properties, expected, "{}", err_msg);
         }
 
         Ok(())
@@ -2266,6 +2318,7 @@ mod tests {
 
         Ok(())
     }
+
     #[test]
     fn test_eliminate_redundant_monotonic_sorts() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
@@ -2334,7 +2387,7 @@ mod tests {
         for case in cases {
             let mut properties = 
base_properties.clone().add_constants(case.constants);
             for [left, right] in &case.equal_conditions {
-                properties.add_equal_conditions(left, right)
+                properties.add_equal_conditions(left, right)?
             }
 
             let sort = case
diff --git a/datafusion/physical-expr/src/expressions/binary.rs 
b/datafusion/physical-expr/src/expressions/binary.rs
index 76154dca03..08f7523f92 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -23,21 +23,21 @@ use std::{any::Any, sync::Arc};
 use crate::expressions::datum::{apply, apply_cmp};
 use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
 use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
 use crate::PhysicalExpr;
 
 use arrow::array::*;
 use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
 use arrow::compute::kernels::cmp::*;
-use arrow::compute::kernels::comparison::regexp_is_match_utf8;
-use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar;
+use arrow::compute::kernels::comparison::{
+    regexp_is_match_utf8, regexp_is_match_utf8_scalar,
+};
 use arrow::compute::kernels::concat_elements::concat_elements_utf8;
 use arrow::compute::{cast, ilike, like, nilike, nlike};
 use arrow::datatypes::*;
-
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::{internal_err, Result, ScalarValue};
 use datafusion_expr::interval_arithmetic::{apply_operator, Interval};
+use datafusion_expr::sort_properties::ExprProperties;
 use datafusion_expr::type_coercion::binary::get_result_type;
 use datafusion_expr::{ColumnarValue, Operator};
 
@@ -442,17 +442,45 @@ impl PhysicalExpr for BinaryExpr {
         self.hash(&mut s);
     }
 
-    /// For each operator, [`BinaryExpr`] has distinct ordering rules.
-    /// TODO: There may be rules specific to some data types (such as division 
and multiplication on unsigned integers)
-    fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
-        let (left_child, right_child) = (&children[0], &children[1]);
+    /// For each operator, [`BinaryExpr`] has distinct rules.
+    /// TODO: There may be rules specific to some data types and expression 
ranges.
+    fn get_properties(&self, children: &[ExprProperties]) -> 
Result<ExprProperties> {
+        let (l_order, l_range) = (children[0].sort_properties, 
&children[0].range);
+        let (r_order, r_range) = (children[1].sort_properties, 
&children[1].range);
         match self.op() {
-            Operator::Plus => left_child.add(right_child),
-            Operator::Minus => left_child.sub(right_child),
-            Operator::Gt | Operator::GtEq => 
left_child.gt_or_gteq(right_child),
-            Operator::Lt | Operator::LtEq => 
right_child.gt_or_gteq(left_child),
-            Operator::And | Operator::Or => left_child.and_or(right_child),
-            _ => SortProperties::Unordered,
+            Operator::Plus => Ok(ExprProperties {
+                sort_properties: l_order.add(&r_order),
+                range: l_range.add(r_range)?,
+            }),
+            Operator::Minus => Ok(ExprProperties {
+                sort_properties: l_order.sub(&r_order),
+                range: l_range.sub(r_range)?,
+            }),
+            Operator::Gt => Ok(ExprProperties {
+                sort_properties: l_order.gt_or_gteq(&r_order),
+                range: l_range.gt(r_range)?,
+            }),
+            Operator::GtEq => Ok(ExprProperties {
+                sort_properties: l_order.gt_or_gteq(&r_order),
+                range: l_range.gt_eq(r_range)?,
+            }),
+            Operator::Lt => Ok(ExprProperties {
+                sort_properties: r_order.gt_or_gteq(&l_order),
+                range: l_range.lt(r_range)?,
+            }),
+            Operator::LtEq => Ok(ExprProperties {
+                sort_properties: r_order.gt_or_gteq(&l_order),
+                range: l_range.lt_eq(r_range)?,
+            }),
+            Operator::And => Ok(ExprProperties {
+                sort_properties: r_order.and_or(&l_order),
+                range: l_range.and(r_range)?,
+            }),
+            Operator::Or => Ok(ExprProperties {
+                sort_properties: r_order.and_or(&l_order),
+                range: l_range.or(r_range)?,
+            }),
+            _ => Ok(ExprProperties::new_unknown()),
         }
     }
 }
@@ -623,6 +651,7 @@ pub fn binary(
 mod tests {
     use super::*;
     use crate::expressions::{col, lit, try_cast, Literal};
+
     use datafusion_common::plan_datafusion_err;
     use datafusion_expr::type_coercion::binary::get_input_types;
 
diff --git a/datafusion/physical-expr/src/expressions/cast.rs 
b/datafusion/physical-expr/src/expressions/cast.rs
index a3b32461e5..79a44ac30c 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -15,21 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
-use crate::PhysicalExpr;
 use std::any::Any;
 use std::fmt;
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
-use DataType::*;
+
+use crate::physical_expr::down_cast_any_ref;
+use crate::PhysicalExpr;
 
 use arrow::compute::{can_cast_types, CastOptions};
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, DataType::*, Schema};
 use arrow::record_batch::RecordBatch;
 use datafusion_common::format::DEFAULT_FORMAT_OPTIONS;
 use datafusion_common::{not_impl_err, Result};
 use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::ExprProperties;
 use datafusion_expr::ColumnarValue;
 
 const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
@@ -163,9 +163,21 @@ impl PhysicalExpr for CastExpr {
         self.cast_options.hash(&mut s);
     }
 
-    /// A [`CastExpr`] preserves the ordering of its child.
-    fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
-        children[0]
+    /// A [`CastExpr`] preserves the ordering of its child if the cast is done
+    /// under the same datatype family.
+    fn get_properties(&self, children: &[ExprProperties]) -> 
Result<ExprProperties> {
+        let source_datatype = children[0].range.data_type();
+        let target_type = &self.cast_type;
+
+        let unbounded = Interval::make_unbounded(target_type)?;
+        if source_datatype.is_numeric() && target_type.is_numeric()
+            || source_datatype.is_temporal() && target_type.is_temporal()
+            || source_datatype.eq(target_type)
+        {
+            Ok(children[0].clone().with_range(unbounded))
+        } else {
+            Ok(ExprProperties::new_unknown().with_range(unbounded))
+        }
     }
 }
 
diff --git a/datafusion/physical-expr/src/expressions/literal.rs 
b/datafusion/physical-expr/src/expressions/literal.rs
index 35ea80ea57..371028959a 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -22,7 +22,6 @@ use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
 use crate::PhysicalExpr;
 
 use arrow::{
@@ -30,6 +29,8 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
 use datafusion_expr::{ColumnarValue, Expr};
 
 /// Represents a literal value
@@ -90,8 +91,11 @@ impl PhysicalExpr for Literal {
         self.hash(&mut s);
     }
 
-    fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
-        SortProperties::Singleton
+    fn get_properties(&self, _children: &[ExprProperties]) -> 
Result<ExprProperties> {
+        Ok(ExprProperties {
+            sort_properties: SortProperties::Singleton,
+            range: Interval::try_new(self.value().clone(), 
self.value().clone())?,
+        })
     }
 }
 
@@ -115,6 +119,7 @@ pub fn lit<T: datafusion_expr::Literal>(value: T) -> 
Arc<dyn PhysicalExpr> {
 #[cfg(test)]
 mod tests {
     use super::*;
+
     use arrow::array::Int32Array;
     use arrow::datatypes::*;
     use datafusion_common::cast::as_int32_array;
diff --git a/datafusion/physical-expr/src/expressions/negative.rs 
b/datafusion/physical-expr/src/expressions/negative.rs
index f6d4620c42..62f865bd9b 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -22,7 +22,6 @@ use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
 use crate::PhysicalExpr;
 
 use arrow::{
@@ -32,6 +31,7 @@ use arrow::{
 };
 use datafusion_common::{plan_err, Result};
 use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::ExprProperties;
 use datafusion_expr::{
     type_coercion::{is_interval, is_null, is_signed_numeric, is_timestamp},
     ColumnarValue,
@@ -134,8 +134,11 @@ impl PhysicalExpr for NegativeExpr {
     }
 
     /// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
-    fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
-        -children[0]
+    fn get_properties(&self, children: &[ExprProperties]) -> 
Result<ExprProperties> {
+        Ok(ExprProperties {
+            sort_properties: -children[0].sort_properties,
+            range: children[0].range.clone().arithmetic_negate()?,
+        })
     }
 }
 
diff --git a/datafusion/physical-expr/src/functions.rs 
b/datafusion/physical-expr/src/functions.rs
index 21cf6d348c..9c7d6d0934 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -33,14 +33,12 @@
 
 use std::sync::Arc;
 
-use arrow::array::ArrayRef;
-use arrow_array::Array;
-
-pub use crate::scalar_function::create_physical_expr;
+use arrow::array::{Array, ArrayRef};
 use datafusion_common::{Result, ScalarValue};
-pub use datafusion_expr::FuncMonotonicity;
 use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation};
 
+pub use crate::scalar_function::create_physical_expr;
+
 #[derive(Debug, Clone, Copy)]
 pub enum Hint {
     /// Indicates the argument needs to be padded if it is scalar
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index aef5aa7c00..1bdf082b2e 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -61,13 +61,6 @@ pub use scalar_function::ScalarFunctionExpr;
 pub use datafusion_physical_expr_common::utils::reverse_order_bys;
 pub use utils::split_conjunction;
 
-// For backwards compatibility
-pub mod sort_properties {
-    pub use datafusion_physical_expr_common::sort_properties::{
-        ExprOrdering, SortProperties,
-    };
-}
-
 // For backwards compatibility
 pub mod tree_node {
     pub use datafusion_physical_expr_common::tree_node::ExprContext;
diff --git a/datafusion/physical-expr/src/scalar_function.rs 
b/datafusion/physical-expr/src/scalar_function.rs
index 1244a9b4db..daa1100710 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -32,19 +32,18 @@
 use std::any::Any;
 use std::fmt::{self, Debug, Formatter};
 use std::hash::{Hash, Hasher};
-use std::ops::Neg;
 use std::sync::Arc;
 
+use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal};
+use crate::PhysicalExpr;
+
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
-
 use datafusion_common::{internal_err, DFSchema, Result};
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::sort_properties::ExprProperties;
 use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf;
-use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, FuncMonotonicity, 
ScalarUDF};
-
-use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal};
-use crate::sort_properties::SortProperties;
-use crate::PhysicalExpr;
+use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, ScalarUDF};
 
 /// Physical expression of a scalar function
 pub struct ScalarFunctionExpr {
@@ -52,11 +51,6 @@ pub struct ScalarFunctionExpr {
     name: String,
     args: Vec<Arc<dyn PhysicalExpr>>,
     return_type: DataType,
-    // Keeps monotonicity information of the function.
-    // FuncMonotonicity vector is one to one mapped to `args`,
-    // and it specifies the effect of an increase or decrease in
-    // the corresponding `arg` to the function value.
-    monotonicity: Option<FuncMonotonicity>,
 }
 
 impl Debug for ScalarFunctionExpr {
@@ -66,7 +60,6 @@ impl Debug for ScalarFunctionExpr {
             .field("name", &self.name)
             .field("args", &self.args)
             .field("return_type", &self.return_type)
-            .field("monotonicity", &self.monotonicity)
             .finish()
     }
 }
@@ -78,14 +71,12 @@ impl ScalarFunctionExpr {
         fun: Arc<ScalarUDF>,
         args: Vec<Arc<dyn PhysicalExpr>>,
         return_type: DataType,
-        monotonicity: Option<FuncMonotonicity>,
     ) -> Self {
         Self {
             fun,
             name: name.to_owned(),
             args,
             return_type,
-            monotonicity,
         }
     }
 
@@ -108,11 +99,6 @@ impl ScalarFunctionExpr {
     pub fn return_type(&self) -> &DataType {
         &self.return_type
     }
-
-    /// Monotonicity information of the function
-    pub fn monotonicity(&self) -> &Option<FuncMonotonicity> {
-        &self.monotonicity
-    }
 }
 
 impl fmt::Display for ScalarFunctionExpr {
@@ -170,10 +156,21 @@ impl PhysicalExpr for ScalarFunctionExpr {
             self.fun.clone(),
             children,
             self.return_type().clone(),
-            self.monotonicity.clone(),
         )))
     }
 
+    fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
+        self.fun.evaluate_bounds(children)
+    }
+
+    fn propagate_constraints(
+        &self,
+        interval: &Interval,
+        children: &[&Interval],
+    ) -> Result<Option<Vec<Interval>>> {
+        self.fun.propagate_constraints(interval, children)
+    }
+
     fn dyn_hash(&self, state: &mut dyn Hasher) {
         let mut s = state;
         self.name.hash(&mut s);
@@ -182,11 +179,18 @@ impl PhysicalExpr for ScalarFunctionExpr {
         // Add `self.fun` when hash is available
     }
 
-    fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
-        self.monotonicity
-            .as_ref()
-            .map(|monotonicity| out_ordering(monotonicity, children))
-            .unwrap_or(SortProperties::Unordered)
+    fn get_properties(&self, children: &[ExprProperties]) -> 
Result<ExprProperties> {
+        let sort_properties = self.fun.monotonicity(children)?;
+        let children_range = children
+            .iter()
+            .map(|props| &props.range)
+            .collect::<Vec<_>>();
+        let range = self.fun().evaluate_bounds(&children_range)?;
+
+        Ok(ExprProperties {
+            sort_properties,
+            range,
+        })
     }
 }
 
@@ -231,63 +235,5 @@ pub fn create_physical_expr(
         Arc::new(fun.clone()),
         input_phy_exprs.to_vec(),
         return_type,
-        fun.monotonicity()?,
     )))
 }
-
-/// Determines a [ScalarFunctionExpr]'s monotonicity for the given arguments
-/// and the function's behavior depending on its arguments.
-///
-/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr
-pub fn out_ordering(
-    func: &FuncMonotonicity,
-    arg_orderings: &[SortProperties],
-) -> SortProperties {
-    func.iter().zip(arg_orderings).fold(
-        SortProperties::Singleton,
-        |prev_sort, (item, arg)| {
-            let current_sort = func_order_in_one_dimension(item, arg);
-
-            match (prev_sort, current_sort) {
-                (_, SortProperties::Unordered) => SortProperties::Unordered,
-                (SortProperties::Singleton, SortProperties::Ordered(_)) => 
current_sort,
-                (SortProperties::Ordered(prev), 
SortProperties::Ordered(current))
-                    if prev.descending != current.descending =>
-                {
-                    SortProperties::Unordered
-                }
-                _ => prev_sort,
-            }
-        },
-    )
-}
-
-/// This function decides the monotonicity property of a [ScalarFunctionExpr] 
for a single argument (i.e. across a single dimension), given that argument's 
sort properties.
-///
-/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr
-fn func_order_in_one_dimension(
-    func_monotonicity: &Option<bool>,
-    arg: &SortProperties,
-) -> SortProperties {
-    if *arg == SortProperties::Singleton {
-        SortProperties::Singleton
-    } else {
-        match func_monotonicity {
-            None => SortProperties::Unordered,
-            Some(false) => {
-                if let SortProperties::Ordered(_) = arg {
-                    arg.neg()
-                } else {
-                    SortProperties::Unordered
-                }
-            }
-            Some(true) => {
-                if let SortProperties::Ordered(_) = arg {
-                    *arg
-                } else {
-                    SortProperties::Unordered
-                }
-            }
-        }
-    }
-}
diff --git a/datafusion/physical-expr/src/utils/mod.rs 
b/datafusion/physical-expr/src/utils/mod.rs
index 76cee3a1a7..6b964546cb 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -255,19 +255,18 @@ pub fn merge_vectors(
 
 #[cfg(test)]
 pub(crate) mod tests {
-    use arrow_array::{ArrayRef, Float32Array, Float64Array};
     use std::any::Any;
     use std::fmt::{Display, Formatter};
 
     use super::*;
     use crate::expressions::{binary, cast, col, in_list, lit, Literal};
 
+    use arrow_array::{ArrayRef, Float32Array, Float64Array};
     use arrow_schema::{DataType, Field, Schema};
     use datafusion_common::{exec_err, DataFusionError, ScalarValue};
+    use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+    use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
 
-    use datafusion_expr::{
-        ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility,
-    };
     use petgraph::visit::Bfs;
 
     #[derive(Debug, Clone)]
@@ -309,8 +308,8 @@ pub(crate) mod tests {
             }
         }
 
-        fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-            Ok(Some(vec![Some(true)]))
+        fn monotonicity(&self, input: &[ExprProperties]) -> 
Result<SortProperties> {
+            Ok(input[0].sort_properties)
         }
 
         fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 95376e7e69..21608db40d 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -2080,7 +2080,7 @@ mod tests {
         let col_c = &col("c", &test_schema)?;
         let mut eq_properties = EquivalenceProperties::new(test_schema);
         // Columns a and b are equal.
-        eq_properties.add_equal_conditions(col_a, col_b);
+        eq_properties.add_equal_conditions(col_a, col_b)?;
         // Aggregate requirements are
         // [None], [a ASC], [a ASC, b ASC, c ASC], [a ASC, b ASC] respectively
         let order_by_exprs = vec![
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index bf1ab8b731..6729e3b9e6 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -192,7 +192,7 @@ impl FilterExec {
         let mut eq_properties = input.equivalence_properties().clone();
         let (equal_pairs, _) = collect_columns_from_predicate(predicate);
         for (lhs, rhs) in equal_pairs {
-            eq_properties.add_equal_conditions(lhs, rhs)
+            eq_properties.add_equal_conditions(lhs, rhs)?
         }
         // Add the columns that have only one viable value (singleton) after
         // filtering to constants.
@@ -433,13 +433,12 @@ pub type EqualAndNonEqual<'a> =
 
 #[cfg(test)]
 mod tests {
-
     use super::*;
+    use crate::empty::EmptyExec;
     use crate::expressions::*;
     use crate::test;
     use crate::test::exec::StatisticsExec;
 
-    use crate::empty::EmptyExec;
     use arrow::datatypes::{Field, Schema};
     use arrow_schema::{UnionFields, UnionMode};
     use datafusion_common::ScalarValue;
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index a290f30586..b7bc60a048 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -354,7 +354,6 @@ pub fn parse_physical_expr(
                 scalar_fun_def,
                 args,
                 convert_required!(e.return_type)?,
-                None,
             ))
         }
         ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index dd8e450d31..79abecf556 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -624,7 +624,6 @@ fn roundtrip_scalar_udf() -> Result<()> {
         fun_def,
         vec![col("a", &schema)?],
         DataType::Int64,
-        None,
     );
 
     let project =
@@ -752,7 +751,6 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> {
         Arc::new(udf.clone()),
         vec![col("text", &schema)?],
         DataType::Int64,
-        None,
     ));
 
     let filter = Arc::new(FilterExec::try_new(
diff --git a/datafusion/sqllogictest/test_files/order.slt 
b/datafusion/sqllogictest/test_files/order.slt
index 0f869fc0b4..fb07d5ebe8 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -955,3 +955,154 @@ drop table foo;
 
 statement ok
 drop table ambiguity_test;
+
+# Casting from numeric to string types breaks the ordering
+statement ok
+CREATE EXTERNAL TABLE ordered_table (
+  a0 INT,
+  a INT,
+  b INT,
+  c INT,
+  d INT
+)
+STORED AS CSV
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv'
+OPTIONS ('format.has_header' 'true');
+
+query T
+SELECT CAST(c as VARCHAR) as c_str
+FROM ordered_table
+ORDER BY c_str
+limit 5;
+----
+0
+1
+10
+11
+12
+
+query TT
+EXPLAIN SELECT CAST(c as VARCHAR) as c_str
+FROM ordered_table
+ORDER BY c_str
+limit 5;
+----
+logical_plan
+01)Limit: skip=0, fetch=5
+02)--Sort: c_str ASC NULLS LAST, fetch=5
+03)----Projection: CAST(ordered_table.c AS Utf8) AS c_str
+04)------TableScan: ordered_table projection=[c]
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=5
+02)--SortPreservingMergeExec: [c_str@0 ASC NULLS LAST], fetch=5
+03)----SortExec: TopK(fetch=5), expr=[c_str@0 ASC NULLS LAST], 
preserve_partitioning=[true]
+04)------ProjectionExec: expr=[CAST(c@0 AS Utf8) as c_str]
+05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+06)----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+
+# Casting from numeric to numeric types preserves the ordering
+query I
+SELECT CAST(c as BIGINT) as c_bigint
+FROM ordered_table
+ORDER BY c_bigint
+limit 5;
+----
+0
+1
+2
+3
+4
+
+query TT
+EXPLAIN SELECT CAST(c as BIGINT) as c_bigint
+FROM ordered_table
+ORDER BY c_bigint
+limit 5;
+----
+logical_plan
+01)Limit: skip=0, fetch=5
+02)--Sort: c_bigint ASC NULLS LAST, fetch=5
+03)----Projection: CAST(ordered_table.c AS Int64) AS c_bigint
+04)------TableScan: ordered_table projection=[c]
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=5
+02)--SortPreservingMergeExec: [c_bigint@0 ASC NULLS LAST], fetch=5
+03)----ProjectionExec: expr=[CAST(c@0 AS Int64) as c_bigint]
+04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+statement ok
+drop table ordered_table; 
+
+
+# ABS(x) breaks the ordering if x's range contains both negative and positive 
values.
+# Since x is defined as INT, its range is assumed to be from NEG_INF to INF.
+statement ok
+CREATE EXTERNAL TABLE ordered_table (
+  a0 INT,
+  a INT,
+  b INT,
+  c INT,
+  d INT
+)
+STORED AS CSV
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv'
+OPTIONS ('format.has_header' 'true');
+
+query TT
+EXPLAIN SELECT ABS(c) as abs_c
+FROM ordered_table
+ORDER BY abs_c
+limit 5;
+----
+logical_plan
+01)Limit: skip=0, fetch=5
+02)--Sort: abs_c ASC NULLS LAST, fetch=5
+03)----Projection: abs(ordered_table.c) AS abs_c
+04)------TableScan: ordered_table projection=[c]
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=5
+02)--SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5
+03)----SortExec: TopK(fetch=5), expr=[abs_c@0 ASC NULLS LAST], 
preserve_partitioning=[true]
+04)------ProjectionExec: expr=[abs(c@0) as abs_c]
+05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+06)----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+statement ok
+drop table ordered_table; 
+
+# ABS(x) preserves the ordering if x's range falls into positive values.
+# Since x is defined as INT UNSIGNED, its range is assumed to be from 0 to INF.
+statement ok
+CREATE EXTERNAL TABLE ordered_table (
+  a0 INT,
+  a INT,
+  b INT,
+  c INT UNSIGNED,
+  d INT
+)
+STORED AS CSV
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv'
+OPTIONS ('format.has_header' 'true');
+
+query TT
+EXPLAIN SELECT ABS(c) as abs_c
+FROM ordered_table
+ORDER BY abs_c
+limit 5;
+----
+logical_plan
+01)Limit: skip=0, fetch=5
+02)--Sort: abs_c ASC NULLS LAST, fetch=5
+03)----Projection: abs(ordered_table.c) AS abs_c
+04)------TableScan: ordered_table projection=[c]
+physical_plan
+01)GlobalLimitExec: skip=0, fetch=5
+02)--SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5
+03)----ProjectionExec: expr=[abs(c@0) as abs_c]
+04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
output_ordering=[c@0 ASC NULLS LAST], has_header=true
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to