adriangb commented on code in PR #20814:
URL: https://github.com/apache/datafusion/pull/20814#discussion_r2903249014


##########
datafusion/physical-expr/src/expressions/cast.rs:
##########
@@ -92,10 +93,23 @@ impl CastExpr {
         expr: Arc<dyn PhysicalExpr>,
         cast_type: DataType,
         cast_options: Option<CastOptions<'static>>,
+    ) -> Self {
+        Self::new_with_target_field(
+            expr,
+            cast_type.into_nullable_field_ref(),
+            cast_options,
+        )
+    }
+
+    /// Create a new CastExpr with an explicit target field.

Review Comment:
   Should we update the docs on `new()` and cross-link between them (e.g. `if 
you don't have a field and only have a DataType you can use new() ...`)?



##########
datafusion/physical-expr/src/equivalence/properties/mod.rs:
##########
@@ -844,32 +864,10 @@ impl EquivalenceProperties {
                     let expr_type = sort_expr.expr.data_type(schema).unwrap();
                     // TODO: Add one-to-one analysis for ScalarFunctions.
                     for r_expr in referring_exprs {
-                        // We check whether this expression is substitutable.
-                        if let Some(cast_expr) =
-                            r_expr.as_any().downcast_ref::<CastExpr>()
-                        {
-                            // For casts, we need to know whether the cast

Review Comment:
   We also seem to have lost some code comments here. Again the originals were 
not super high quality IMO but the code is complex, some nice comments on the 
new version would go a long way.



##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -404,71 +404,69 @@ impl DefaultPhysicalExprAdapterRewriter {
             }
         };
 
-        // Check if the column exists in the physical schema
-        let physical_column_index = match self
-            .physical_file_schema
-            .index_of(column.name())
-        {
-            Ok(index) => index,
-            Err(_) => {
-                if !logical_field.is_nullable() {
-                    return exec_err!(
-                        "Non-nullable column '{}' is missing from the physical 
schema",
-                        column.name()
-                    );
-                }
-                // If the column is missing from the physical schema fill it 
in with nulls.
-                // For a different behavior, provide a custom 
`PhysicalExprAdapter` implementation.
-                let null_value = 
ScalarValue::Null.cast_to(logical_field.data_type())?;
-                return Ok(Transformed::yes(Arc::new(
-                    expressions::Literal::new_with_metadata(
-                        null_value,
-                        Some(FieldMetadata::from(logical_field)),
-                    ),
-                )));
+        let Some((resolved_column, physical_field)) =
+            self.resolve_physical_column(column)?
+        else {
+            if !logical_field.is_nullable() {
+                return exec_err!(
+                    "Non-nullable column '{}' is missing from the physical 
schema",
+                    column.name()
+                );
             }
+            // If the column is missing from the physical schema fill it in 
with nulls.
+            // For a different behavior, provide a custom 
`PhysicalExprAdapter` implementation.
+            let null_value = 
ScalarValue::Null.cast_to(logical_field.data_type())?;
+            return Ok(Transformed::yes(Arc::new(
+                expressions::Literal::new_with_metadata(
+                    null_value,
+                    Some(FieldMetadata::from(logical_field)),
+                ),
+            )));
         };
-        let physical_field = 
self.physical_file_schema.field(physical_column_index);
 
-        if column.index() == physical_column_index && logical_field == 
physical_field {
+        if resolved_column.index() == column.index()
+            && logical_field == physical_field.as_ref()
+        {
             return Ok(Transformed::no(expr));
         }
 
-        let column = self.resolve_column(column, physical_column_index)?;
-
-        if logical_field == physical_field {
+        if logical_field == physical_field.as_ref() {
             // If the fields match (including metadata/nullability), we can 
use the column as is
-            return Ok(Transformed::yes(Arc::new(column)));
+            return Ok(Transformed::yes(Arc::new(resolved_column)));
         }
 
-        if logical_field.data_type() == physical_field.data_type() {
-            // The data type matches, but the field metadata / nullability 
differs.
-            // Emit a CastColumnExpr so downstream schema construction uses 
the logical field.
-            return self.create_cast_column_expr(column, logical_field);
-        }
-
-        // We need to cast the column to the logical data type
+        // We need a cast expression whenever the logical and physical fields 
differ,
+        // whether that difference is only metadata/nullability or also data 
type.
         // TODO: add optimization to move the cast from the column to literal 
expressions in the case of `col = 123`
         // since that's much cheaper to evalaute.
         // See 
https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
-        self.create_cast_column_expr(column, logical_field)
+        self.create_cast_column_expr(resolved_column, physical_field, 
logical_field)
     }
 
-    /// Resolves a column expression, handling index and type mismatches.
-    ///
-    /// Returns the appropriate Column expression when the column's index or 
data type
-    /// don't match the physical schema. Assumes that the early-exit case 
(both index
-    /// and type match) has already been checked by the caller.
-    fn resolve_column(
+    /// Resolves a logical column to the corresponding physical column and 
field.
+    fn resolve_physical_column(
         &self,
         column: &Column,
-        physical_column_index: usize,
-    ) -> Result<Column> {
-        if column.index() == physical_column_index {
-            Ok(column.clone())
+    ) -> Result<Option<(Column, FieldRef)>> {
+        let Ok(physical_column_index) = 
self.physical_file_schema.index_of(column.name())

Review Comment:
   We lost the comment `// Look up the column index in the physical schema by 
name to ensure correctness.` which may have not been the most helpful comment, 
but I do think it's worth having something to help readers in places where we 
use index or name matching understand why we chose one and not the other, the 
bugs from that tend to be very subtle.



##########
datafusion/physical-expr/src/expressions/cast.rs:
##########
@@ -107,14 +121,40 @@ impl CastExpr {
 
     /// The data type to cast to
     pub fn cast_type(&self) -> &DataType {
-        &self.cast_type
+        self.target_field.data_type()
+    }
+
+    /// Field metadata describing the output column after casting.
+    pub fn target_field(&self) -> &FieldRef {
+        &self.target_field
     }
 
     /// The cast options
     pub fn cast_options(&self) -> &CastOptions<'static> {
         &self.cast_options
     }
 
+    fn is_legacy_target_field(&self) -> bool {

Review Comment:
   Is legacy the right word here? Maybe `default()` or `is_helper_field()`? 
Unless we plan on removing that code path I wouldn't call it `legacy`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to