This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fd97799ddc Make Physical CastExpr Field-aware and unify cast semantics 
across physical expressions (#20814)
fd97799ddc is described below

commit fd97799ddc06347e25b0a2285a6b76e1c0d887c6
Author: kosiew <[email protected]>
AuthorDate: Tue Mar 10 09:15:51 2026 +0800

    Make Physical CastExpr Field-aware and unify cast semantics across physical 
expressions (#20814)
    
    ## Which issue does this PR close?
    
    * Part of #20164
    
    ## Rationale for this change
    
    Physical `CastExpr` previously stored only a target `DataType`. This
    caused field-level semantics (name, nullability, and metadata) to be
    lost when casts were represented in the physical layer. In contrast,
    logical expressions already carry this information through `FieldRef`.
    
    This mismatch created several issues:
    
    * Physical and logical cast representations diverged in how they
    preserve schema semantics.
    * Struct casting logic behaved differently depending on whether the cast
    was represented as `CastExpr` or `CastColumnExpr`.
    * Downstream components (such as schema rewriting and ordering
    equivalence analysis) required additional branching and duplicated
    logic.
    
    Making `CastExpr` field-aware aligns the physical representation with
    logical semantics and enables consistent schema propagation across
    execution planning and expression evaluation.
    
    ## What changes are included in this PR?
    
    This PR introduces field-aware semantics to `CastExpr` and simplifies
    several areas that previously relied on type-only casting.
    
    Key changes include:
    
    1. **Field-aware CastExpr**
    
    * Replace the `cast_type: DataType` field with `target_field: FieldRef`.
    * Add `new_with_target_field` constructor to explicitly construct
    field-aware casts.
    * Keep the existing `new(expr, DataType)` constructor as a compatibility
    shim that creates a canonical field.
    
    2. **Return-field and nullability behavior**
    
    * `return_field` now returns the full `target_field`, preserving name,
    nullability, and metadata.
    * `nullable()` now derives its result from the resolved target field
    rather than the input expression.
    * Add compatibility logic for legacy type-only casts to preserve
    previous behavior.
    
    3. **Struct cast validation improvements**
    
    * Struct-to-struct casting now validates compatibility using field
    information before execution.
    * Planning-time validation prevents unsupported casts from reaching
    execution.
    
    4. **Shared cast property logic**
    
    * Introduce shared helper functions (`cast_expr_properties`,
    `is_order_preserving_cast_family`) for determining ordering
    preservation.
    * Reuse this logic in both `CastExpr` and `CastColumnExpr` to avoid
    duplicated implementations.
    
    5. **Schema rewriter improvements**
    
       * Refactor physical column resolution into `resolve_physical_column`.
    * Simplify cast insertion logic when logical and physical fields differ.
    * Pass explicit physical and logical fields to cast creation for
    improved correctness.
    
    6. **Ordering equivalence simplification**
    
    * Introduce `substitute_cast_like_ordering` helper to unify handling of
    `CastExpr` and `CastColumnExpr` in ordering equivalence analysis.
    
    7. **Additional unit tests**
    
       * Validate metadata propagation through `return_field`.
       * Verify nullability behavior for field-aware casts.
       * Ensure legacy type-only casts preserve existing semantics.
       * Test struct-cast validation with nested field semantics.
    
    ## Are these changes tested?
    
    Yes.
    
    New unit tests were added in `physical-expr/src/expressions/cast.rs` to
    verify:
    
    * Metadata propagation through field-aware casts
    * Correct nullability behavior derived from the target field
    * Backward compatibility with legacy type-only constructors
    * Struct cast compatibility validation using nested fields
    
    Existing tests continue to pass and validate compatibility with the
    previous API behavior.
    
    ## Are there any user-facing changes?
    
    There are no direct user-facing behavior changes.
    
    This change primarily improves internal schema semantics and consistency
    in the physical expression layer. Existing APIs remain compatible
    through the legacy constructor that accepts only a `DataType`.
    
    ## LLM-generated code disclosure
    
    This PR includes LLM-generated code and comments. All LLM-generated
    content has been manually reviewed and tested.
---
 .../physical-expr-adapter/src/schema_rewriter.rs   | 122 +++++-----
 .../src/equivalence/properties/mod.rs              |  65 +++---
 datafusion/physical-expr/src/expressions/cast.rs   | 260 +++++++++++++++++----
 .../physical-expr/src/expressions/cast_column.rs   |  16 +-
 4 files changed, 319 insertions(+), 144 deletions(-)

diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs 
b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
index e1f45bb0d1..a2a45cbdfe 100644
--- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs
+++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
 
 use arrow::array::RecordBatch;
 use arrow::compute::can_cast_types;
-use arrow::datatypes::{DataType, Field, SchemaRef};
+use arrow::datatypes::{DataType, Field, FieldRef, SchemaRef};
 use datafusion_common::{
     Result, ScalarValue, exec_err,
     metadata::FieldMetadata,
@@ -404,71 +404,76 @@ impl DefaultPhysicalExprAdapterRewriter {
             }
         };
 
-        // Check if the column exists in the physical schema
-        let physical_column_index = match self
-            .physical_file_schema
-            .index_of(column.name())
-        {
-            Ok(index) => index,
-            Err(_) => {
-                if !logical_field.is_nullable() {
-                    return exec_err!(
-                        "Non-nullable column '{}' is missing from the physical 
schema",
-                        column.name()
-                    );
-                }
-                // If the column is missing from the physical schema fill it 
in with nulls.
-                // For a different behavior, provide a custom 
`PhysicalExprAdapter` implementation.
-                let null_value = 
ScalarValue::Null.cast_to(logical_field.data_type())?;
-                return Ok(Transformed::yes(Arc::new(
-                    expressions::Literal::new_with_metadata(
-                        null_value,
-                        Some(FieldMetadata::from(logical_field)),
-                    ),
-                )));
+        let Some((resolved_column, physical_field)) =
+            self.resolve_physical_column(column)?
+        else {
+            if !logical_field.is_nullable() {
+                return exec_err!(
+                    "Non-nullable column '{}' is missing from the physical 
schema",
+                    column.name()
+                );
             }
+            // If the column is missing from the physical schema fill it in 
with nulls.
+            // For a different behavior, provide a custom 
`PhysicalExprAdapter` implementation.
+            let null_value = 
ScalarValue::Null.cast_to(logical_field.data_type())?;
+            return Ok(Transformed::yes(Arc::new(
+                expressions::Literal::new_with_metadata(
+                    null_value,
+                    Some(FieldMetadata::from(logical_field)),
+                ),
+            )));
         };
-        let physical_field = 
self.physical_file_schema.field(physical_column_index);
 
-        if column.index() == physical_column_index && logical_field == 
physical_field {
+        if resolved_column.index() == column.index()
+            && logical_field == physical_field.as_ref()
+        {
             return Ok(Transformed::no(expr));
         }
 
-        let column = self.resolve_column(column, physical_column_index)?;
-
-        if logical_field == physical_field {
+        if logical_field == physical_field.as_ref() {
             // If the fields match (including metadata/nullability), we can 
use the column as is
-            return Ok(Transformed::yes(Arc::new(column)));
+            return Ok(Transformed::yes(Arc::new(resolved_column)));
         }
 
-        if logical_field.data_type() == physical_field.data_type() {
-            // The data type matches, but the field metadata / nullability 
differs.
-            // Emit a CastColumnExpr so downstream schema construction uses 
the logical field.
-            return self.create_cast_column_expr(column, logical_field);
-        }
-
-        // We need to cast the column to the logical data type
+        // We need a cast expression whenever the logical and physical fields 
differ,
+        // whether that difference is only metadata/nullability or also data 
type.
         // TODO: add optimization to move the cast from the column to literal 
expressions in the case of `col = 123`
         // since that's much cheaper to evalaute.
         // See 
https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
-        self.create_cast_column_expr(column, logical_field)
+        self.create_cast_column_expr(resolved_column, physical_field, 
logical_field)
     }
 
-    /// Resolves a column expression, handling index and type mismatches.
-    ///
-    /// Returns the appropriate Column expression when the column's index or 
data type
-    /// don't match the physical schema. Assumes that the early-exit case 
(both index
-    /// and type match) has already been checked by the caller.
-    fn resolve_column(
+    /// Resolves a logical column to the corresponding physical column and 
field.
+    fn resolve_physical_column(
         &self,
         column: &Column,
-        physical_column_index: usize,
-    ) -> Result<Column> {
-        if column.index() == physical_column_index {
-            Ok(column.clone())
+    ) -> Result<Option<(Column, FieldRef)>> {
+        // The physical schema adaptation step intentionally resolves columns 
by **name first**
+        // rather than trusting the incoming index. This mirrors what the old 
refactoring
+        // did before `resolve_physical_column()` was extracted: the planner 
might hand us a
+        // `Column` whose `index` field is stale (e.g. after projection/rename 
rewrites), so
+        // resolving by name ensures we match the correct physical slot. Once 
we know the
+        // proper index we rebuild the `Column` with `new_with_schema` so 
callers can rely
+        // on `column.index()` later without having to re-query the schema.
+        let Ok(physical_column_index) = 
self.physical_file_schema.index_of(column.name())
+        else {
+            return Ok(None);
+        };
+
+        let column = if column.index() == physical_column_index {
+            column.clone()
         } else {
-            Column::new_with_schema(column.name(), 
self.physical_file_schema.as_ref())
-        }
+            Column::new_with_schema(column.name(), 
self.physical_file_schema.as_ref())?
+        };
+
+        Ok(Some((
+            column,
+            Arc::new(
+                self.physical_file_schema
+                    .field(physical_column_index)
+                    .clone(),
+            ),
+        )))
     }
 
     /// Validates type compatibility and creates a CastColumnExpr if needed.
@@ -479,19 +484,15 @@ impl DefaultPhysicalExprAdapterRewriter {
     fn create_cast_column_expr(
         &self,
         column: Column,
+        physical_field: FieldRef,
         logical_field: &Field,
     ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
-        // Look up the column index in the physical schema by name to ensure 
correctness.
-        let physical_column_index = 
self.physical_file_schema.index_of(column.name())?;
-        let actual_physical_field =
-            self.physical_file_schema.field(physical_column_index);
-
         // For struct types, use validate_struct_compatibility which handles:
         // - Missing fields in source (filled with nulls)
         // - Extra fields in source (ignored)
         // - Recursive validation of nested structs
         // For non-struct types, use Arrow's can_cast_types
-        match (actual_physical_field.data_type(), logical_field.data_type()) {
+        match (physical_field.data_type(), logical_field.data_type()) {
             (DataType::Struct(physical_fields), 
DataType::Struct(logical_fields)) => {
                 validate_struct_compatibility(
                     physical_fields.as_ref(),
@@ -499,15 +500,13 @@ impl DefaultPhysicalExprAdapterRewriter {
                 )?;
             }
             _ => {
-                let is_compatible = can_cast_types(
-                    actual_physical_field.data_type(),
-                    logical_field.data_type(),
-                );
+                let is_compatible =
+                    can_cast_types(physical_field.data_type(), 
logical_field.data_type());
                 if !is_compatible {
                     return exec_err!(
                         "Cannot cast column '{}' from '{}' (physical data 
type) to '{}' (logical data type)",
                         column.name(),
-                        actual_physical_field.data_type(),
+                        physical_field.data_type(),
                         logical_field.data_type()
                     );
                 }
@@ -516,7 +515,7 @@ impl DefaultPhysicalExprAdapterRewriter {
 
         let cast_expr = Arc::new(CastColumnExpr::new(
             Arc::new(column),
-            Arc::new(actual_physical_field.clone()),
+            physical_field,
             Arc::new(logical_field.clone()),
             None,
         ));
@@ -1604,6 +1603,7 @@ mod tests {
         let transformed = rewriter
             .create_cast_column_expr(
                 Column::new("a", 0),
+                
Arc::new(physical_schema.field_with_name("a").unwrap().clone()),
                 logical_schema.field_with_name("a").unwrap(),
             )
             .unwrap();
diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs 
b/datafusion/physical-expr/src/equivalence/properties/mod.rs
index e9ed0b226f..1ca4ead033 100644
--- a/datafusion/physical-expr/src/equivalence/properties/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs
@@ -39,7 +39,7 @@ use crate::{
     PhysicalSortRequirement,
 };
 
-use arrow::datatypes::SchemaRef;
+use arrow::datatypes::{DataType, SchemaRef};
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{Constraint, Constraints, HashMap, Result, plan_err};
 use datafusion_expr::interval_arithmetic::Interval;
@@ -195,6 +195,39 @@ impl OrderingEquivalenceCache {
 }
 
 impl EquivalenceProperties {
+    /// Helper used by the ordering equivalence rule when considering whether a
+    /// cast-bearing expression can replace an existing sort key without 
invalidating
+    /// the ordering.
+    ///
+    /// This function handles *both* `CastExpr` (generic cast) and
+    /// `CastColumnExpr` (field-aware cast) because the planner may introduce 
either
+    /// form during rewrite steps; the core logic is the same in both cases.  
The
+    /// substitution is only allowed when the cast wraps **the very same child
+    /// expression** that the original sort used (an exact-child-match 
invariant),
+    /// and the casted type must be a widening/order-preserving conversion
+    /// `CastExpr::check_bigger_cast(...)` ensures.  Without those 
restrictions the
+    /// existing sort order could be violated (e.g. a narrowing cast could 
collapse
+    /// distinct values together).
+    fn substitute_cast_like_ordering(
+        r_expr: Arc<dyn PhysicalExpr>,
+        sort_expr: &PhysicalSortExpr,
+        expr_type: &DataType,
+    ) -> Option<PhysicalSortExpr> {
+        let (child_expr, cast_type) = if let Some(cast_expr) =
+            r_expr.as_any().downcast_ref::<CastExpr>()
+        {
+            (cast_expr.expr(), cast_expr.cast_type())
+        } else if let Some(cast_expr) = 
r_expr.as_any().downcast_ref::<CastColumnExpr>() {
+            (cast_expr.expr(), cast_expr.target_field().data_type())
+        } else {
+            return None;
+        };
+
+        (child_expr.eq(&sort_expr.expr)
+            && CastExpr::check_bigger_cast(cast_type, expr_type))
+        .then(|| PhysicalSortExpr::new(r_expr, sort_expr.options))
+    }
+
     /// Creates an empty `EquivalenceProperties` object.
     pub fn new(schema: SchemaRef) -> Self {
         Self {
@@ -844,32 +877,10 @@ impl EquivalenceProperties {
                     let expr_type = sort_expr.expr.data_type(schema).unwrap();
                     // TODO: Add one-to-one analysis for ScalarFunctions.
                     for r_expr in referring_exprs {
-                        // We check whether this expression is substitutable.
-                        if let Some(cast_expr) =
-                            r_expr.as_any().downcast_ref::<CastExpr>()
-                        {
-                            // For casts, we need to know whether the cast
-                            // expression matches:
-                            if cast_expr.expr.eq(&sort_expr.expr)
-                                && cast_expr.is_bigger_cast(&expr_type)
-                            {
-                                result.push(PhysicalSortExpr::new(
-                                    r_expr,
-                                    sort_expr.options,
-                                ));
-                            }
-                        } else if let Some(cast_expr) =
-                            r_expr.as_any().downcast_ref::<CastColumnExpr>()
-                        {
-                            let cast_type = 
cast_expr.target_field().data_type();
-                            if cast_expr.expr().eq(&sort_expr.expr)
-                                && CastExpr::check_bigger_cast(cast_type, 
&expr_type)
-                            {
-                                result.push(PhysicalSortExpr::new(
-                                    r_expr,
-                                    sort_expr.options,
-                                ));
-                            }
+                        if let Some(substituted) = 
Self::substitute_cast_like_ordering(
+                            r_expr, &sort_expr, &expr_type,
+                        ) {
+                            result.push(substituted);
                         }
                     }
                     result.push(sort_expr);
diff --git a/datafusion/physical-expr/src/expressions/cast.rs 
b/datafusion/physical-expr/src/expressions/cast.rs
index 2d44215cf2..5a80daf663 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -25,6 +25,7 @@ use crate::physical_expr::PhysicalExpr;
 use arrow::compute::{CastOptions, can_cast_types};
 use arrow::datatypes::{DataType, DataType::*, FieldRef, Schema};
 use arrow::record_batch::RecordBatch;
+use datafusion_common::datatype::DataTypeExt;
 use datafusion_common::format::DEFAULT_FORMAT_OPTIONS;
 use datafusion_common::nested_struct::validate_struct_compatibility;
 use datafusion_common::{Result, not_impl_err};
@@ -63,8 +64,8 @@ fn can_cast_struct_types(source: &DataType, target: 
&DataType) -> bool {
 pub struct CastExpr {
     /// The expression to cast
     pub expr: Arc<dyn PhysicalExpr>,
-    /// The data type to cast to
-    cast_type: DataType,
+    /// Field metadata describing the desired output after casting
+    target_field: FieldRef,
     /// Cast options
     cast_options: CastOptions<'static>,
 }
@@ -73,7 +74,7 @@ pub struct CastExpr {
 impl PartialEq for CastExpr {
     fn eq(&self, other: &Self) -> bool {
         self.expr.eq(&other.expr)
-            && self.cast_type.eq(&other.cast_type)
+            && self.target_field.eq(&other.target_field)
             && self.cast_options.eq(&other.cast_options)
     }
 }
@@ -81,21 +82,55 @@ impl PartialEq for CastExpr {
 impl Hash for CastExpr {
     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
         self.expr.hash(state);
-        self.cast_type.hash(state);
+        self.target_field.hash(state);
         self.cast_options.hash(state);
     }
 }
 
 impl CastExpr {
-    /// Create a new CastExpr
+    /// Create a new `CastExpr` using only a `DataType`.
+    ///
+    /// This constructor is provided for compatibility with existing call sites
+    /// that only know the target type.  It synthesizes a ``Field`` with the
+    /// given type (**nullable by default**) and no name metadata.  Callers 
that
+    /// already have a `FieldRef` (for example, coming from schema inference 
or a
+    /// resolved column) should prefer [`CastExpr::new_with_target_field`], 
which
+    /// preserves the field's name, nullability, and other metadata.  In other
+    /// words:
+    ///
+    /// * use `new()` when only a `DataType` is available and you want the 
legacy
+    ///   semantics of a type-only cast
+    /// * use `new_with_target_field()` when you need explicit field
+    ///   metadata/name/nullability preserved
     pub fn new(
         expr: Arc<dyn PhysicalExpr>,
         cast_type: DataType,
         cast_options: Option<CastOptions<'static>>,
+    ) -> Self {
+        Self::new_with_target_field(
+            expr,
+            cast_type.into_nullable_field_ref(),
+            cast_options,
+        )
+    }
+
+    /// Create a new `CastExpr` with an explicit target `FieldRef`.
+    ///
+    /// The provided `target_field` is used verbatim for the expression's
+    /// return schema, so the field's name, nullability, and other metadata are
+    /// preserved.  This is the preferred constructor when the caller already
+    /// has field information (for example, during logical-to-physical 
planning).
+    ///
+    /// See [`CastExpr::new`] for the compatibility constructor that only 
accepts
+    /// a `DataType`.
+    pub fn new_with_target_field(
+        expr: Arc<dyn PhysicalExpr>,
+        target_field: FieldRef,
+        cast_options: Option<CastOptions<'static>>,
     ) -> Self {
         Self {
             expr,
-            cast_type,
+            target_field,
             cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS),
         }
     }
@@ -107,7 +142,12 @@ impl CastExpr {
 
     /// The data type to cast to
     pub fn cast_type(&self) -> &DataType {
-        &self.cast_type
+        self.target_field.data_type()
+    }
+
+    /// Field metadata describing the output column after casting.
+    pub fn target_field(&self) -> &FieldRef {
+        &self.target_field
     }
 
     /// The cast options
@@ -115,6 +155,27 @@ impl CastExpr {
         &self.cast_options
     }
 
+    fn is_default_target_field(&self) -> bool {
+        self.target_field.name().is_empty()
+            && self.target_field.is_nullable()
+            && self.target_field.metadata().is_empty()
+    }
+
+    fn resolved_target_field(&self, input_schema: &Schema) -> Result<FieldRef> 
{
+        if self.is_default_target_field() {
+            self.expr.return_field(input_schema).map(|field| {
+                Arc::new(
+                    field
+                        .as_ref()
+                        .clone()
+                        .with_data_type(self.cast_type().clone()),
+                )
+            })
+        } else {
+            Ok(Arc::clone(&self.target_field))
+        }
+    }
+
     /// Check if casting from the specified source type to the target type is a
     /// widening cast (e.g. from `Int8` to `Int16`).
     pub fn check_bigger_cast(cast_type: &DataType, src: &DataType) -> bool {
@@ -140,13 +201,34 @@ impl CastExpr {
 
     /// Check if the cast is a widening cast (e.g. from `Int8` to `Int16`).
     pub fn is_bigger_cast(&self, src: &DataType) -> bool {
-        Self::check_bigger_cast(&self.cast_type, src)
+        Self::check_bigger_cast(self.cast_type(), src)
+    }
+}
+
+pub(crate) fn is_order_preserving_cast_family(
+    source_type: &DataType,
+    target_type: &DataType,
+) -> bool {
+    (source_type.is_numeric() || *source_type == Boolean) && 
target_type.is_numeric()
+        || source_type.is_temporal() && target_type.is_temporal()
+        || source_type.eq(target_type)
+}
+
+pub(crate) fn cast_expr_properties(
+    child: &ExprProperties,
+    target_type: &DataType,
+) -> Result<ExprProperties> {
+    let unbounded = Interval::make_unbounded(target_type)?;
+    if is_order_preserving_cast_family(&child.range.data_type(), target_type) {
+        Ok(child.clone().with_range(unbounded))
+    } else {
+        Ok(ExprProperties::new_unknown().with_range(unbounded))
     }
 }
 
 impl fmt::Display for CastExpr {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "CAST({} AS {})", self.expr, self.cast_type)
+        write!(f, "CAST({} AS {})", self.expr, self.cast_type())
     }
 }
 
@@ -157,26 +239,27 @@ impl PhysicalExpr for CastExpr {
     }
 
     fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
-        Ok(self.cast_type.clone())
+        Ok(self.cast_type().clone())
     }
 
     fn nullable(&self, input_schema: &Schema) -> Result<bool> {
-        self.expr.nullable(input_schema)
+        // A cast is nullable if **either** the child is nullable or the
+        // target field allows nulls.  This conservative rule prevents
+        // optimizers from assuming a non-null result when a null input could
+        // still propagate.  `return_field()` continues to expose the exact
+        // target metadata separately.
+        let child_nullable = self.expr.nullable(input_schema)?;
+        let target_nullable = 
self.resolved_target_field(input_schema)?.is_nullable();
+        Ok(child_nullable || target_nullable)
     }
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let value = self.expr.evaluate(batch)?;
-        value.cast_to(&self.cast_type, Some(&self.cast_options))
+        value.cast_to(self.cast_type(), Some(&self.cast_options))
     }
 
     fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
-        Ok(self
-            .expr
-            .return_field(input_schema)?
-            .as_ref()
-            .clone()
-            .with_data_type(self.cast_type.clone())
-            .into())
+        self.resolved_target_field(input_schema)
     }
 
     fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
@@ -187,16 +270,16 @@ impl PhysicalExpr for CastExpr {
         self: Arc<Self>,
         children: Vec<Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn PhysicalExpr>> {
-        Ok(Arc::new(CastExpr::new(
+        Ok(Arc::new(CastExpr::new_with_target_field(
             Arc::clone(&children[0]),
-            self.cast_type.clone(),
+            Arc::clone(&self.target_field),
             Some(self.cast_options.clone()),
         )))
     }
 
     fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
         // Cast current node's interval to the right type:
-        children[0].cast_to(&self.cast_type, &self.cast_options)
+        children[0].cast_to(self.cast_type(), &self.cast_options)
     }
 
     fn propagate_constraints(
@@ -215,25 +298,13 @@ impl PhysicalExpr for CastExpr {
     /// A [`CastExpr`] preserves the ordering of its child if the cast is done
     /// under the same datatype family.
     fn get_properties(&self, children: &[ExprProperties]) -> 
Result<ExprProperties> {
-        let source_datatype = children[0].range.data_type();
-        let target_type = &self.cast_type;
-
-        let unbounded = Interval::make_unbounded(target_type)?;
-        if (source_datatype.is_numeric() || source_datatype == Boolean)
-            && target_type.is_numeric()
-            || source_datatype.is_temporal() && target_type.is_temporal()
-            || source_datatype.eq(target_type)
-        {
-            Ok(children[0].clone().with_range(unbounded))
-        } else {
-            Ok(ExprProperties::new_unknown().with_range(unbounded))
-        }
+        cast_expr_properties(&children[0], self.cast_type())
     }
 
     fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         write!(f, "CAST(")?;
         self.expr.fmt_sql(f)?;
-        write!(f, " AS {:?}", self.cast_type)?;
+        write!(f, " AS {:?}", self.cast_type())?;
 
         write!(f, ")")
     }
@@ -252,14 +323,18 @@ pub fn cast_with_options(
     let expr_type = expr.data_type(input_schema)?;
     if expr_type == cast_type {
         Ok(Arc::clone(&expr))
+    } else if matches!((&expr_type, &cast_type), (Struct(_), Struct(_))) {
+        if can_cast_struct_types(&expr_type, &cast_type) {
+            // Allow struct-to-struct casts that pass name-based compatibility 
validation.
+            // This validation is applied at planning time (now) to fail fast, 
rather than
+            // deferring errors to execution time. The name-based casting 
logic will be
+            // executed at runtime via ColumnarValue::cast_to.
+            Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options)))
+        } else {
+            not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}")
+        }
     } else if can_cast_types(&expr_type, &cast_type) {
         Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options)))
-    } else if can_cast_struct_types(&expr_type, &cast_type) {
-        // Allow struct-to-struct casts that pass name-based compatibility 
validation.
-        // This validation is applied at planning time (now) to fail fast, 
rather than
-        // deferring errors to execution time. The name-based casting logic 
will be
-        // executed at runtime via ColumnarValue::cast_to.
-        Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options)))
     } else {
         not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}")
     }
@@ -293,6 +368,7 @@ mod tests {
     };
     use datafusion_physical_expr_common::physical_expr::fmt_sql;
     use insta::assert_snapshot;
+    use std::collections::HashMap;
 
     // runs an end-to-end test of physical type cast
     // 1. construct a record batch with a column "a" of type A
@@ -800,6 +876,106 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn field_aware_cast_return_field_preserves_target_metadata() -> Result<()> 
{
+        let schema = Schema::new(vec![Field::new("a", Int32, false)]);
+        let expr = CastExpr::new_with_target_field(
+            col("a", &schema)?,
+            Arc::new(Field::new("cast_target", Int64, true).with_metadata(
+                HashMap::from([("target_meta".to_string(), "1".to_string())]),
+            )),
+            None,
+        );
+
+        let field = expr.return_field(&schema)?;
+
+        assert_eq!(field.name(), "cast_target");
+        assert_eq!(field.data_type(), &Int64);
+        assert!(field.is_nullable());
+        assert_eq!(
+            field.metadata().get("target_meta").map(String::as_str),
+            Some("1")
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn field_aware_cast_nullable_prefers_child_nullability() -> Result<()> {
+        // When the child expression is nullable the cast must be treated as
+        // nullable even if the explicitly supplied target field is marked
+        // non-nullable.  return_field() still reflects the target metadata.
+        let schema = Schema::new(vec![Field::new("a", Int32, true)]);
+        let expr = CastExpr::new_with_target_field(
+            col("a", &schema)?,
+            Arc::new(Field::new("cast_target", Int64, false)),
+            None,
+        );
+
+        assert!(expr.nullable(&schema)?);
+        assert!(!expr.return_field(&schema)?.is_nullable());
+
+        Ok(())
+    }
+
+    #[test]
+    fn type_only_cast_preserves_legacy_field_name_and_nullability() -> 
Result<()> {
+        let schema = Schema::new(vec![Field::new("a", Int32, false)]);
+        let expr = CastExpr::new(col("a", &schema)?, Int64, None);
+
+        let field = expr.return_field(&schema)?;
+
+        assert_eq!(field.name(), "a");
+        assert_eq!(field.data_type(), &Int64);
+        assert!(!field.is_nullable());
+        assert!(!expr.nullable(&schema)?);
+
+        Ok(())
+    }
+
+    #[test]
+    fn field_aware_cast_nullable_child_nonnullable_targets_nullable() -> 
Result<()> {
+        // child is non-nullable but the target field is marked nullable; the
+        // nullable() result should still be true because the field allows 
nulls.
+        let schema = Schema::new(vec![Field::new("a", Int32, false)]);
+        let expr = CastExpr::new_with_target_field(
+            col("a", &schema)?,
+            Arc::new(Field::new("cast_target", Int64, true)),
+            None,
+        );
+
+        assert!(expr.nullable(&schema)?);
+        assert!(expr.return_field(&schema)?.is_nullable());
+
+        Ok(())
+    }
+
+    #[test]
+    fn struct_cast_validation_uses_nested_target_fields() -> Result<()> {
+        let source_type = Struct(Fields::from(vec![
+            Arc::new(Field::new("x", Int32, true)),
+            Arc::new(Field::new("y", Utf8, true)),
+        ]));
+        let schema = Schema::new(vec![Field::new("a", source_type.clone(), 
true)]);
+
+        let valid_target = Struct(Fields::from(vec![
+            Arc::new(Field::new("y", Utf8, true)),
+            Arc::new(Field::new("x", Int64, true)),
+        ]));
+        cast_with_options(col("a", &schema)?, &schema, valid_target, None)?;
+
+        let invalid_target = Struct(Fields::from(vec![
+            Arc::new(Field::new("y", Utf8, true)),
+            Arc::new(Field::new("missing", Int64, false)),
+        ]));
+        let err = cast_with_options(col("a", &schema)?, &schema, 
invalid_target, None)
+            .expect_err("missing required struct field should fail");
+
+        assert!(err.to_string().contains("Unsupported CAST"));
+
+        Ok(())
+    }
+
     #[test]
     #[ignore] // TODO: https://github.com/apache/datafusion/issues/5396
     fn test_cast_decimal() -> Result<()> {
diff --git a/datafusion/physical-expr/src/expressions/cast_column.rs 
b/datafusion/physical-expr/src/expressions/cast_column.rs
index f6c4d080fc..a99953abdb 100644
--- a/datafusion/physical-expr/src/expressions/cast_column.rs
+++ b/datafusion/physical-expr/src/expressions/cast_column.rs
@@ -17,6 +17,7 @@
 
 //! Physical expression for struct-aware casting of columns.
 
+use super::cast::cast_expr_properties;
 use crate::physical_expr::PhysicalExpr;
 use arrow::{
     compute::CastOptions,
@@ -27,7 +28,6 @@ use datafusion_common::{
     Result, ScalarValue, format::DEFAULT_CAST_OPTIONS, 
nested_struct::cast_column,
 };
 use datafusion_expr_common::columnar_value::ColumnarValue;
-use datafusion_expr_common::interval_arithmetic::Interval;
 use datafusion_expr_common::sort_properties::ExprProperties;
 use std::{
     any::Any,
@@ -182,19 +182,7 @@ impl PhysicalExpr for CastColumnExpr {
     /// A [`CastColumnExpr`] preserves the ordering of its child if the cast 
is done
     /// under the same datatype family.
     fn get_properties(&self, children: &[ExprProperties]) -> 
Result<ExprProperties> {
-        let source_datatype = children[0].range.data_type();
-        let target_type = self.target_field.data_type();
-
-        let unbounded = Interval::make_unbounded(target_type)?;
-        if (source_datatype.is_numeric() || source_datatype == 
DataType::Boolean)
-            && target_type.is_numeric()
-            || source_datatype.is_temporal() && target_type.is_temporal()
-            || source_datatype.eq(target_type)
-        {
-            Ok(children[0].clone().with_range(unbounded))
-        } else {
-            Ok(ExprProperties::new_unknown().with_range(unbounded))
-        }
+        cast_expr_properties(&children[0], self.target_field.data_type())
     }
 
     fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to