This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e3d3257fc0 feat: add `datafusion-physical-adapter`, implement 
predicate adaptation missing fields of structs (#16589)
e3d3257fc0 is described below

commit e3d3257fc03564f0f4295a299208d8735fe6671c
Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com>
AuthorDate: Fri Aug 15 08:00:20 2025 -0500

    feat: add `datafusion-physical-adapter`, implement predicate adaptation 
missing fields of structs (#16589)
---
 Cargo.lock                                         |  19 ++
 Cargo.toml                                         |   2 +
 datafusion-examples/Cargo.toml                     |   1 +
 datafusion-examples/examples/custom_file_casts.rs  |   6 +-
 .../examples/default_column_values.rs              |   6 +-
 datafusion-examples/examples/json_shredding.rs     |   6 +-
 datafusion/core/Cargo.toml                         |   1 +
 datafusion/core/src/datasource/listing/table.rs    |   2 +-
 datafusion/core/tests/parquet/schema_adapter.rs    |   7 +-
 datafusion/datasource-parquet/Cargo.toml           |   1 +
 datafusion/datasource-parquet/src/opener.rs        |   6 +-
 datafusion/datasource-parquet/src/source.rs        |   2 +-
 datafusion/datasource/Cargo.toml                   |   1 +
 datafusion/datasource/src/file_scan_config.rs      |   2 +-
 datafusion/physical-expr-adapter/Cargo.toml        |  29 ++
 datafusion/physical-expr-adapter/README.md         |   8 +
 datafusion/physical-expr-adapter/src/lib.rs        |  31 +++
 .../src/schema_rewriter.rs                         | 293 +++++++++++++++++++--
 datafusion/physical-expr/src/lib.rs                |   3 +-
 datafusion/physical-expr/src/scalar_function.rs    |  23 ++
 docs/source/library-user-guide/upgrading.md        |  18 ++
 21 files changed, 427 insertions(+), 40 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 9c887514b6..49ea259787 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1849,6 +1849,7 @@ dependencies = [
  "datafusion-macros",
  "datafusion-optimizer",
  "datafusion-physical-expr",
+ "datafusion-physical-expr-adapter",
  "datafusion-physical-expr-common",
  "datafusion-physical-optimizer",
  "datafusion-physical-plan",
@@ -2041,6 +2042,7 @@ dependencies = [
  "datafusion-execution",
  "datafusion-expr",
  "datafusion-physical-expr",
+ "datafusion-physical-expr-adapter",
  "datafusion-physical-expr-common",
  "datafusion-physical-plan",
  "datafusion-session",
@@ -2147,6 +2149,7 @@ dependencies = [
  "datafusion-expr",
  "datafusion-functions-aggregate",
  "datafusion-physical-expr",
+ "datafusion-physical-expr-adapter",
  "datafusion-physical-expr-common",
  "datafusion-physical-optimizer",
  "datafusion-physical-plan",
@@ -2180,6 +2183,7 @@ dependencies = [
  "dashmap",
  "datafusion",
  "datafusion-ffi",
+ "datafusion-physical-expr-adapter",
  "datafusion-proto",
  "env_logger",
  "futures",
@@ -2459,6 +2463,21 @@ dependencies = [
  "rstest",
 ]
 
+[[package]]
+name = "datafusion-physical-expr-adapter"
+version = "49.0.0"
+dependencies = [
+ "arrow",
+ "datafusion-common",
+ "datafusion-expr",
+ "datafusion-functions",
+ "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
+ "insta",
+ "itertools 0.14.0",
+ "rstest",
+]
+
 [[package]]
 name = "datafusion-physical-expr-common"
 version = "49.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 1de0959d6a..31db1acd38 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,6 +40,7 @@ members = [
     "datafusion/functions-window-common",
     "datafusion/optimizer",
     "datafusion/physical-expr",
+    "datafusion/physical-expr-adapter",
     "datafusion/physical-expr-common",
     "datafusion/physical-optimizer",
     "datafusion/pruning",
@@ -134,6 +135,7 @@ datafusion-functions-window-common = { path = 
"datafusion/functions-window-commo
 datafusion-macros = { path = "datafusion/macros", version = "49.0.0" }
 datafusion-optimizer = { path = "datafusion/optimizer", version = "49.0.0", 
default-features = false }
 datafusion-physical-expr = { path = "datafusion/physical-expr", version = 
"49.0.0", default-features = false }
+datafusion-physical-expr-adapter = { path = 
"datafusion/physical-expr-adapter", version = "49.0.0", default-features = 
false }
 datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", 
version = "49.0.0", default-features = false }
 datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", 
version = "49.0.0" }
 datafusion-physical-plan = { path = "datafusion/physical-plan", version = 
"49.0.0" }
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 409fc12bcb..f12bd9202e 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -68,6 +68,7 @@ dashmap = { workspace = true }
 base64 = "0.22.1"
 datafusion = { workspace = true, default-features = true }
 datafusion-ffi = { workspace = true }
+datafusion-physical-expr-adapter = { workspace = true }
 datafusion-proto = { workspace = true }
 env_logger = { workspace = true }
 futures = { workspace = true }
diff --git a/datafusion-examples/examples/custom_file_casts.rs 
b/datafusion-examples/examples/custom_file_casts.rs
index 847aa8ad7f..a787c07c2b 100644
--- a/datafusion-examples/examples/custom_file_casts.rs
+++ b/datafusion-examples/examples/custom_file_casts.rs
@@ -31,11 +31,11 @@ use datafusion::execution::context::SessionContext;
 use datafusion::execution::object_store::ObjectStoreUrl;
 use datafusion::parquet::arrow::ArrowWriter;
 use datafusion::physical_expr::expressions::CastExpr;
-use datafusion::physical_expr::schema_rewriter::{
-    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, 
PhysicalExprAdapterFactory,
-};
 use datafusion::physical_expr::PhysicalExpr;
 use datafusion::prelude::SessionConfig;
+use datafusion_physical_expr_adapter::{
+    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, 
PhysicalExprAdapterFactory,
+};
 use object_store::memory::InMemory;
 use object_store::path::Path;
 use object_store::{ObjectStore, PutPayload};
diff --git a/datafusion-examples/examples/default_column_values.rs 
b/datafusion-examples/examples/default_column_values.rs
index b504ef3aad..43e2d4ca09 100644
--- a/datafusion-examples/examples/default_column_values.rs
+++ b/datafusion-examples/examples/default_column_values.rs
@@ -38,12 +38,12 @@ use datafusion::logical_expr::{Expr, 
TableProviderFilterPushDown, TableType};
 use datafusion::parquet::arrow::ArrowWriter;
 use datafusion::parquet::file::properties::WriterProperties;
 use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
-use datafusion::physical_expr::schema_rewriter::{
-    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, 
PhysicalExprAdapterFactory,
-};
 use datafusion::physical_expr::PhysicalExpr;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::{lit, SessionConfig};
+use datafusion_physical_expr_adapter::{
+    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, 
PhysicalExprAdapterFactory,
+};
 use futures::StreamExt;
 use object_store::memory::InMemory;
 use object_store::path::Path;
diff --git a/datafusion-examples/examples/json_shredding.rs 
b/datafusion-examples/examples/json_shredding.rs
index 261bf47915..b7acb5c7b7 100644
--- a/datafusion-examples/examples/json_shredding.rs
+++ b/datafusion-examples/examples/json_shredding.rs
@@ -40,14 +40,14 @@ use datafusion::logical_expr::{
 };
 use datafusion::parquet::arrow::ArrowWriter;
 use datafusion::parquet::file::properties::WriterProperties;
-use datafusion::physical_expr::schema_rewriter::{
-    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, 
PhysicalExprAdapterFactory,
-};
 use datafusion::physical_expr::PhysicalExpr;
 use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::{lit, SessionConfig};
 use datafusion::scalar::ScalarValue;
+use datafusion_physical_expr_adapter::{
+    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, 
PhysicalExprAdapterFactory,
+};
 use futures::StreamExt;
 use object_store::memory::InMemory;
 use object_store::path::Path;
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 53a17c1b93..f1f220f105 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -129,6 +129,7 @@ datafusion-functions-table = { workspace = true }
 datafusion-functions-window = { workspace = true }
 datafusion-optimizer = { workspace = true }
 datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-adapter = { workspace = true }
 datafusion-physical-expr-common = { workspace = true }
 datafusion-physical-optimizer = { workspace = true }
 datafusion-physical-plan = { workspace = true }
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 121ab46730..5741060bf2 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -48,7 +48,7 @@ use datafusion_execution::{
 use datafusion_expr::{
     dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
 };
-use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory;
+use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
 use futures::{future, stream, Stream, StreamExt, TryStreamExt};
diff --git a/datafusion/core/tests/parquet/schema_adapter.rs 
b/datafusion/core/tests/parquet/schema_adapter.rs
index f9a46f2e24..f685ccdc9f 100644
--- a/datafusion/core/tests/parquet/schema_adapter.rs
+++ b/datafusion/core/tests/parquet/schema_adapter.rs
@@ -34,10 +34,11 @@ use datafusion_datasource::schema_adapter::{
 use datafusion_datasource::ListingTableUrl;
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_physical_expr::expressions::{self, Column};
-use datafusion_physical_expr::schema_rewriter::{
-    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, 
PhysicalExprAdapterFactory,
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr_adapter::{
+    DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, 
PhysicalExprAdapter,
+    PhysicalExprAdapterFactory,
 };
-use datafusion_physical_expr::{DefaultPhysicalExprAdapter, PhysicalExpr};
 use itertools::Itertools;
 use object_store::{memory::InMemory, path::Path, ObjectStore};
 use parquet::arrow::ArrowWriter;
diff --git a/datafusion/datasource-parquet/Cargo.toml 
b/datafusion/datasource-parquet/Cargo.toml
index 6bccd76b60..ae67f91184 100644
--- a/datafusion/datasource-parquet/Cargo.toml
+++ b/datafusion/datasource-parquet/Cargo.toml
@@ -42,6 +42,7 @@ datafusion-execution = { workspace = true }
 datafusion-expr = { workspace = true }
 datafusion-functions-aggregate = { workspace = true }
 datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-adapter = { workspace = true }
 datafusion-physical-expr-common = { workspace = true }
 datafusion-physical-optimizer = { workspace = true }
 datafusion-physical-plan = { workspace = true }
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index af4a9075a6..709fcc5c1f 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -35,8 +35,8 @@ use datafusion_common::encryption::FileDecryptionProperties;
 
 use datafusion_common::{exec_err, DataFusionError, Result};
 use datafusion_datasource::PartitionedFile;
-use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory;
 use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
+use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
 use datafusion_physical_expr_common::physical_expr::{
     is_dynamic_physical_expr, PhysicalExpr,
 };
@@ -585,9 +585,9 @@ mod test {
     };
     use datafusion_expr::{col, lit};
     use datafusion_physical_expr::{
-        expressions::DynamicFilterPhysicalExpr, planner::logical2physical,
-        schema_rewriter::DefaultPhysicalExprAdapterFactory, PhysicalExpr,
+        expressions::DynamicFilterPhysicalExpr, planner::logical2physical, 
PhysicalExpr,
     };
+    use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
     use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, 
MetricsSet};
     use futures::{Stream, StreamExt};
     use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 366d42700f..caec7db0ce 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -41,7 +41,7 @@ use datafusion_common::{DataFusionError, Statistics};
 use datafusion_datasource::file::FileSource;
 use datafusion_datasource::file_scan_config::FileScanConfig;
 use datafusion_physical_expr::conjunction;
-use 
datafusion_physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory;
+use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
 use datafusion_physical_expr_common::physical_expr::fmt_sql;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use datafusion_physical_plan::filter_pushdown::PushedDown;
diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml
index f43894de37..6b2c6cbd40 100644
--- a/datafusion/datasource/Cargo.toml
+++ b/datafusion/datasource/Cargo.toml
@@ -53,6 +53,7 @@ datafusion-common-runtime = { workspace = true }
 datafusion-execution = { workspace = true }
 datafusion-expr = { workspace = true }
 datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-adapter = { workspace = true }
 datafusion-physical-expr-common = { workspace = true }
 datafusion-physical-plan = { workspace = true }
 datafusion-session = { workspace = true }
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index 4d03c46cf5..0922607dbb 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -53,8 +53,8 @@ use datafusion_execution::{
     object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
 };
 use datafusion_physical_expr::expressions::Column;
-use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory;
 use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
+use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
 use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
diff --git a/datafusion/physical-expr-adapter/Cargo.toml 
b/datafusion/physical-expr-adapter/Cargo.toml
new file mode 100644
index 0000000000..c076024b45
--- /dev/null
+++ b/datafusion/physical-expr-adapter/Cargo.toml
@@ -0,0 +1,29 @@
+[package]
+name = "datafusion-physical-expr-adapter"
+description = "Physical expression schema adaptation utilities for DataFusion"
+keywords = ["datafusion", "query", "sql"]
+readme = "README.md"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
+authors = { workspace = true }
+rust-version = { workspace = true }
+
+[lib]
+name = "datafusion_physical_expr_adapter"
+path = "src/lib.rs"
+
+[dependencies]
+arrow = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-expr = { workspace = true }
+datafusion-functions = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-common = { workspace = true }
+itertools = { workspace = true }
+
+[dev-dependencies]
+insta = { workspace = true }
+rstest = { workspace = true }
diff --git a/datafusion/physical-expr-adapter/README.md 
b/datafusion/physical-expr-adapter/README.md
new file mode 100644
index 0000000000..beecd53875
--- /dev/null
+++ b/datafusion/physical-expr-adapter/README.md
@@ -0,0 +1,8 @@
+# DataFusion Physical Expression Adapter
+
+This crate provides utilities for adapting physical expressions to different 
schemas in DataFusion.
+
+It handles schema differences in file scans by rewriting expressions to match 
the physical schema,
+including type casting, missing columns, and partition values.
+
+For detailed documentation, see the [`PhysicalExprAdapter`] trait 
documentation.
diff --git a/datafusion/physical-expr-adapter/src/lib.rs 
b/datafusion/physical-expr-adapter/src/lib.rs
new file mode 100644
index 0000000000..025f1b4b63
--- /dev/null
+++ b/datafusion/physical-expr-adapter/src/lib.rs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![doc(
+    html_logo_url = 
"https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg";,
+    html_favicon_url = 
"https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg";
+)]
+#![cfg_attr(docsrs, feature(doc_auto_cfg))]
+
+//! Physical expression schema adaptation utilities for DataFusion
+
+pub mod schema_rewriter;
+
+pub use schema_rewriter::{
+    DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, 
PhysicalExprAdapter,
+    PhysicalExprAdapterFactory,
+};
diff --git a/datafusion/physical-expr/src/schema_rewriter.rs 
b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
similarity index 68%
rename from datafusion/physical-expr/src/schema_rewriter.rs
rename to datafusion/physical-expr-adapter/src/schema_rewriter.rs
index d622ce4bc0..3bdff1bdfb 100644
--- a/datafusion/physical-expr/src/schema_rewriter.rs
+++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
@@ -20,27 +20,50 @@
 use std::sync::Arc;
 
 use arrow::compute::can_cast_types;
-use arrow::datatypes::{FieldRef, Schema, SchemaRef};
+use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef};
 use datafusion_common::{
     exec_err,
     tree_node::{Transformed, TransformedResult, TreeNode},
     Result, ScalarValue,
 };
+use datafusion_functions::core::getfield::GetFieldFunc;
+use datafusion_physical_expr::{
+    expressions::{self, CastExpr, Column},
+    ScalarFunctionExpr,
+};
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
-use crate::expressions::{self, CastExpr, Column};
-
 /// Trait for adapting physical expressions to match a target schema.
 ///
 /// This is used in file scans to rewrite expressions so that they can be 
evaluated
 /// against the physical schema of the file being scanned. It allows for 
handling
 /// differences between logical and physical schemas, such as type mismatches 
or missing columns.
 ///
-/// You can create a custom implemention of this trait to handle specific 
rewriting logic.
+/// ## Overview
+///
+/// The `PhysicalExprAdapter` allows rewriting physical expressions to match 
different schemas, including:
+///
+/// - **Type casting**: When logical and physical schemas have different 
types, expressions are
+///   automatically wrapped with cast operations. For example, 
`lit(ScalarValue::Int32(123)) = int64_column`
+///   gets rewritten to `lit(ScalarValue::Int32(123)) = cast(int64_column, 
'Int32')`.
+///   Note that this does not attempt to simplify such expressions - that is 
done by shared simplifiers.
+///
+/// - **Missing columns**: When a column exists in the logical schema but not 
in the physical schema,
+///   references to it are replaced with null literals.
+///
+/// - **Struct field access**: Expressions like 
`struct_column.field_that_is_missing_in_schema` are
+///   rewritten to `null` when the field doesn't exist in the physical schema.
+///
+/// - **Partition columns**: Partition column references can be replaced with 
their literal values
+///   when scanning specific partitions.
+///
+/// ## Custom Implementations
+///
+/// You can create a custom implementation of this trait to handle specific 
rewriting logic.
 /// For example, to fill in missing columns with default values instead of 
nulls:
 ///
 /// ```rust
-/// use datafusion_physical_expr::schema_rewriter::{PhysicalExprAdapter, 
PhysicalExprAdapterFactory};
+/// use datafusion_physical_expr_adapter::{PhysicalExprAdapter, 
PhysicalExprAdapterFactory};
 /// use arrow::datatypes::{Schema, Field, DataType, FieldRef, SchemaRef};
 /// use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 /// use datafusion_common::{Result, ScalarValue, tree_node::{Transformed, 
TransformedResult, TreeNode}};
@@ -151,7 +174,7 @@ impl PhysicalExprAdapterFactory for 
DefaultPhysicalExprAdapterFactory {
 /// # Example
 ///
 /// ```rust
-/// use 
datafusion_physical_expr::schema_rewriter::{DefaultPhysicalExprAdapterFactory, 
PhysicalExprAdapterFactory};
+/// use datafusion_physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, 
PhysicalExprAdapterFactory};
 /// use arrow::datatypes::Schema;
 /// use std::sync::Arc;
 ///
@@ -220,6 +243,10 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
         &self,
         expr: Arc<dyn PhysicalExpr>,
     ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
+        if let Some(transformed) = 
self.try_rewrite_struct_field_access(&expr)? {
+            return Ok(Transformed::yes(transformed));
+        }
+
         if let Some(column) = expr.as_any().downcast_ref::<Column>() {
             return self.rewrite_column(Arc::clone(&expr), column);
         }
@@ -227,6 +254,88 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
         Ok(Transformed::no(expr))
     }
 
+    /// Attempt to rewrite struct field access expressions to return null if 
the field does not exist in the physical schema.
+    /// Note that this does *not* handle nested struct fields, only top-level 
struct field access.
+    /// See <https://github.com/apache/datafusion/issues/17114> for more 
details.
+    fn try_rewrite_struct_field_access(
+        &self,
+        expr: &Arc<dyn PhysicalExpr>,
+    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        let get_field_expr =
+            match 
ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(expr.as_ref()) {
+                Some(expr) => expr,
+                None => return Ok(None),
+            };
+
+        let source_expr = match get_field_expr.args().first() {
+            Some(expr) => expr,
+            None => return Ok(None),
+        };
+
+        let field_name_expr = match get_field_expr.args().get(1) {
+            Some(expr) => expr,
+            None => return Ok(None),
+        };
+
+        let lit = match field_name_expr
+            .as_any()
+            .downcast_ref::<expressions::Literal>()
+        {
+            Some(lit) => lit,
+            None => return Ok(None),
+        };
+
+        let field_name = match lit.value().try_as_str().flatten() {
+            Some(name) => name,
+            None => return Ok(None),
+        };
+
+        let column = match source_expr.as_any().downcast_ref::<Column>() {
+            Some(column) => column,
+            None => return Ok(None),
+        };
+
+        let physical_field =
+            match self.physical_file_schema.field_with_name(column.name()) {
+                Ok(field) => field,
+                Err(_) => return Ok(None),
+            };
+
+        let physical_struct_fields = match physical_field.data_type() {
+            DataType::Struct(fields) => fields,
+            _ => return Ok(None),
+        };
+
+        if physical_struct_fields
+            .iter()
+            .any(|f| f.name() == field_name)
+        {
+            return Ok(None);
+        }
+
+        let logical_field = match 
self.logical_file_schema.field_with_name(column.name())
+        {
+            Ok(field) => field,
+            Err(_) => return Ok(None),
+        };
+
+        let logical_struct_fields = match logical_field.data_type() {
+            DataType::Struct(fields) => fields,
+            _ => return Ok(None),
+        };
+
+        let logical_struct_field = match logical_struct_fields
+            .iter()
+            .find(|f| f.name() == field_name)
+        {
+            Some(field) => field,
+            None => return Ok(None),
+        };
+
+        let null_value = 
ScalarValue::Null.cast_to(logical_struct_field.data_type())?;
+        Ok(Some(expressions::lit(null_value)))
+    }
+
     fn rewrite_column(
         &self,
         expr: Arc<dyn PhysicalExpr>,
@@ -304,7 +413,9 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
         // TODO: add optimization to move the cast from the column to literal 
expressions in the case of `col = 123`
         // since that's much cheaper to evalaute.
         // See 
https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
-        if !can_cast_types(physical_field.data_type(), 
logical_field.data_type()) {
+        let is_compatible =
+            can_cast_types(physical_field.data_type(), 
logical_field.data_type());
+        if !is_compatible {
             return exec_err!(
                 "Cannot cast column '{}' from '{}' (physical data type) to 
'{}' (logical data type)",
                 column.name(),
@@ -332,15 +443,13 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
 
 #[cfg(test)]
 mod tests {
-    use crate::expressions::{col, lit};
-
     use super::*;
-    use arrow::{
-        array::{RecordBatch, RecordBatchOptions},
-        datatypes::{DataType, Field, Schema, SchemaRef},
-    };
-    use datafusion_common::{record_batch, ScalarValue};
+    use arrow::array::{RecordBatch, RecordBatchOptions};
+    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+    use datafusion_common::{assert_contains, record_batch, Result, 
ScalarValue};
     use datafusion_expr::Operator;
+    use datafusion_physical_expr::expressions::{col, lit, CastExpr, Column, 
Literal};
+    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
     use itertools::Itertools;
     use std::sync::Arc;
 
@@ -413,7 +522,7 @@ mod tests {
             Arc::new(expected),
             Operator::Or,
             Arc::new(expressions::BinaryExpr::new(
-                lit(ScalarValue::Null),
+                lit(ScalarValue::Float64(None)), // c is missing, so it 
becomes null
                 Operator::Gt,
                 
Arc::new(expressions::Literal::new(ScalarValue::Float64(Some(0.0)))),
             )),
@@ -426,6 +535,75 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_rewrite_struct_column_incompatible() {
+        let physical_schema = Schema::new(vec![Field::new(
+            "data",
+            DataType::Struct(vec![Field::new("field1", DataType::Binary, 
true)].into()),
+            true,
+        )]);
+
+        let logical_schema = Schema::new(vec![Field::new(
+            "data",
+            DataType::Struct(vec![Field::new("field1", DataType::Int32, 
true)].into()),
+            true,
+        )]);
+
+        let factory = DefaultPhysicalExprAdapterFactory;
+        let adapter = factory.create(Arc::new(logical_schema), 
Arc::new(physical_schema));
+        let column_expr = Arc::new(Column::new("data", 0));
+
+        let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
+        assert_contains!(error_msg, "Cannot cast column 'data'");
+    }
+
+    #[test]
+    fn test_rewrite_struct_compatible_cast() {
+        let physical_schema = Schema::new(vec![Field::new(
+            "data",
+            DataType::Struct(
+                vec![
+                    Field::new("id", DataType::Int32, false),
+                    Field::new("name", DataType::Utf8, true),
+                ]
+                .into(),
+            ),
+            false,
+        )]);
+
+        let logical_schema = Schema::new(vec![Field::new(
+            "data",
+            DataType::Struct(
+                vec![
+                    Field::new("id", DataType::Int64, false),
+                    Field::new("name", DataType::Utf8View, true),
+                ]
+                .into(),
+            ),
+            false,
+        )]);
+
+        let factory = DefaultPhysicalExprAdapterFactory;
+        let adapter = factory.create(Arc::new(logical_schema), 
Arc::new(physical_schema));
+        let column_expr = Arc::new(Column::new("data", 0));
+
+        let result = adapter.rewrite(column_expr).unwrap();
+
+        let expected = Arc::new(CastExpr::new(
+            Arc::new(Column::new("data", 0)),
+            DataType::Struct(
+                vec![
+                    Field::new("id", DataType::Int64, false),
+                    Field::new("name", DataType::Utf8View, true),
+                ]
+                .into(),
+            ),
+            None,
+        )) as Arc<dyn PhysicalExpr>;
+
+        assert_eq!(result.to_string(), expected.to_string());
+    }
+
     #[test]
     fn test_rewrite_missing_column() -> Result<()> {
         let (physical_schema, logical_schema) = create_test_schema();
@@ -446,6 +624,42 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_rewrite_missing_column_non_nullable_error() {
+        let physical_schema = Schema::new(vec![Field::new("a", 
DataType::Int32, false)]);
+        let logical_schema = Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Utf8, false), // Missing and non-nullable
+        ]);
+
+        let factory = DefaultPhysicalExprAdapterFactory;
+        let adapter = factory.create(Arc::new(logical_schema), 
Arc::new(physical_schema));
+        let column_expr = Arc::new(Column::new("b", 1));
+
+        let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
+        assert_contains!(error_msg, "Non-nullable column 'b' is missing");
+    }
+
+    #[test]
+    fn test_rewrite_missing_column_nullable() {
+        let physical_schema = Schema::new(vec![Field::new("a", 
DataType::Int32, false)]);
+        let logical_schema = Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Utf8, true), // Missing but nullable
+        ]);
+
+        let factory = DefaultPhysicalExprAdapterFactory;
+        let adapter = factory.create(Arc::new(logical_schema), 
Arc::new(physical_schema));
+        let column_expr = Arc::new(Column::new("b", 1));
+
+        let result = adapter.rewrite(column_expr).unwrap();
+
+        let expected =
+            Arc::new(Literal::new(ScalarValue::Utf8(None))) as Arc<dyn 
PhysicalExpr>;
+
+        assert_eq!(result.to_string(), expected.to_string());
+    }
+
     #[test]
     fn test_rewrite_partition_column() -> Result<()> {
         let (physical_schema, logical_schema) = create_test_schema();
@@ -509,13 +723,13 @@ mod tests {
 
         let result = adapter.rewrite(column_expr);
         assert!(result.is_err());
-        assert!(result
-            .unwrap_err()
-            .to_string()
-            .contains("Non-nullable column 'b' is missing"));
+        assert_contains!(
+            result.unwrap_err().to_string(),
+            "Non-nullable column 'b' is missing from the physical schema"
+        );
     }
 
-    /// Roughly stolen from ProjectionExec
+    /// Helper function to project expressions onto a RecordBatch
     fn batch_project(
         expr: Vec<Arc<dyn PhysicalExpr>>,
         batch: &RecordBatch,
@@ -606,4 +820,43 @@ mod tests {
             vec![Some(1), None, Some(3)]
         );
     }
+
+    #[test]
+    fn test_try_rewrite_struct_field_access() {
+        // Test the core logic of try_rewrite_struct_field_access
+        let physical_schema = Schema::new(vec![Field::new(
+            "struct_col",
+            DataType::Struct(
+                vec![Field::new("existing_field", DataType::Int32, 
true)].into(),
+            ),
+            true,
+        )]);
+
+        let logical_schema = Schema::new(vec![Field::new(
+            "struct_col",
+            DataType::Struct(
+                vec![
+                    Field::new("existing_field", DataType::Int32, true),
+                    Field::new("missing_field", DataType::Utf8, true),
+                ]
+                .into(),
+            ),
+            true,
+        )]);
+
+        let rewriter = DefaultPhysicalExprAdapterRewriter {
+            logical_file_schema: &logical_schema,
+            physical_file_schema: &physical_schema,
+            partition_fields: &[],
+        };
+
+        // Test that when a field exists in physical schema, it returns None
+        let column = Arc::new(Column::new("struct_col", 0)) as Arc<dyn 
PhysicalExpr>;
+        let result = 
rewriter.try_rewrite_struct_field_access(&column).unwrap();
+        assert!(result.is_none());
+
+        // The actual test for the get_field expression would require creating 
a proper ScalarFunctionExpr
+        // with ScalarUDF, which is complex to set up in a unit test. The 
integration tests in
+        // datafusion/core/tests/parquet/schema_adapter.rs provide better 
coverage for this functionality.
+    }
 }
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 845c358d7e..46f7b30d01 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -38,7 +38,6 @@ mod partitioning;
 mod physical_expr;
 pub mod planner;
 mod scalar_function;
-pub mod schema_rewriter;
 pub mod simplifier;
 pub mod statistics;
 pub mod utils;
@@ -70,7 +69,7 @@ pub use datafusion_physical_expr_common::sort_expr::{
 
 pub use planner::{create_physical_expr, create_physical_exprs};
 pub use scalar_function::ScalarFunctionExpr;
-pub use schema_rewriter::DefaultPhysicalExprAdapter;
+pub use simplifier::PhysicalExprSimplifier;
 pub use utils::{conjunction, conjunction_opt, split_conjunction};
 
 // For backwards compatibility
diff --git a/datafusion/physical-expr/src/scalar_function.rs 
b/datafusion/physical-expr/src/scalar_function.rs
index e185cca752..7a3c463f25 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -165,6 +165,29 @@ impl ScalarFunctionExpr {
     pub fn config_options(&self) -> &ConfigOptions {
         &self.config_options
     }
+
+    /// Given an arbitrary PhysicalExpr attempt to downcast it to a 
ScalarFunctionExpr
+    /// and verify that its inner function is of type T.
+    /// If the downcast fails, or the function is not of type T, returns 
`None`.
+    /// Otherwise returns `Some(ScalarFunctionExpr)`.
+    pub fn try_downcast_func<T>(expr: &dyn PhysicalExpr) -> 
Option<&ScalarFunctionExpr>
+    where
+        T: 'static,
+    {
+        match expr.as_any().downcast_ref::<ScalarFunctionExpr>() {
+            Some(scalar_expr)
+                if scalar_expr
+                    .fun()
+                    .inner()
+                    .as_any()
+                    .downcast_ref::<T>()
+                    .is_some() =>
+            {
+                Some(scalar_expr)
+            }
+            _ => None,
+        }
+    }
 }
 
 impl fmt::Display for ScalarFunctionExpr {
diff --git a/docs/source/library-user-guide/upgrading.md 
b/docs/source/library-user-guide/upgrading.md
index 57ce6da117..5c352c1a6e 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -154,6 +154,24 @@ impl AsyncScalarUDFImpl for AskLLM {
 # */
 ```
 
+### Schema Rewriter Module Moved to New Crate
+
+The `schema_rewriter` module and its associated symbols have been moved from 
`datafusion_physical_expr` to a new crate `datafusion_physical_expr_adapter`. 
This affects the following symbols:
+
+- `DefaultPhysicalExprAdapter`
+- `DefaultPhysicalExprAdapterFactory`
+- `PhysicalExprAdapter`
+- `PhysicalExprAdapterFactory`
+
+To upgrade, change your imports to:
+
+```rust
+use datafusion_physical_expr_adapter::{
+    DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory,
+    PhysicalExprAdapter, PhysicalExprAdapterFactory
+};
+```
+
 ### Upgrade to arrow `56.0.0` and parquet `56.0.0`
 
 This version of DataFusion upgrades the underlying Apache Arrow implementation


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org


Reply via email to