This is an automated email from the ASF dual-hosted git repository. adriangb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new e3d3257fc0 feat: add `datafusion-physical-adapter`, implement predicate adaptation missing fields of structs (#16589) e3d3257fc0 is described below commit e3d3257fc03564f0f4295a299208d8735fe6671c Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com> AuthorDate: Fri Aug 15 08:00:20 2025 -0500 feat: add `datafusion-physical-adapter`, implement predicate adaptation missing fields of structs (#16589) --- Cargo.lock | 19 ++ Cargo.toml | 2 + datafusion-examples/Cargo.toml | 1 + datafusion-examples/examples/custom_file_casts.rs | 6 +- .../examples/default_column_values.rs | 6 +- datafusion-examples/examples/json_shredding.rs | 6 +- datafusion/core/Cargo.toml | 1 + datafusion/core/src/datasource/listing/table.rs | 2 +- datafusion/core/tests/parquet/schema_adapter.rs | 7 +- datafusion/datasource-parquet/Cargo.toml | 1 + datafusion/datasource-parquet/src/opener.rs | 6 +- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/datasource/Cargo.toml | 1 + datafusion/datasource/src/file_scan_config.rs | 2 +- datafusion/physical-expr-adapter/Cargo.toml | 29 ++ datafusion/physical-expr-adapter/README.md | 8 + datafusion/physical-expr-adapter/src/lib.rs | 31 +++ .../src/schema_rewriter.rs | 293 +++++++++++++++++++-- datafusion/physical-expr/src/lib.rs | 3 +- datafusion/physical-expr/src/scalar_function.rs | 23 ++ docs/source/library-user-guide/upgrading.md | 18 ++ 21 files changed, 427 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9c887514b6..49ea259787 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1849,6 +1849,7 @@ dependencies = [ "datafusion-macros", "datafusion-optimizer", "datafusion-physical-expr", + "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", "datafusion-physical-optimizer", "datafusion-physical-plan", @@ -2041,6 +2042,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-physical-expr", + "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-session", @@ -2147,6 +2149,7 @@ dependencies = [ "datafusion-expr", "datafusion-functions-aggregate", "datafusion-physical-expr", + "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", "datafusion-physical-optimizer", "datafusion-physical-plan", @@ -2180,6 +2183,7 @@ dependencies = [ "dashmap", "datafusion", "datafusion-ffi", + "datafusion-physical-expr-adapter", "datafusion-proto", "env_logger", "futures", @@ -2459,6 +2463,21 @@ dependencies = [ "rstest", ] +[[package]] +name = "datafusion-physical-expr-adapter" +version = "49.0.0" +dependencies = [ + "arrow", + "datafusion-common", + "datafusion-expr", + "datafusion-functions", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "insta", + "itertools 0.14.0", + "rstest", +] + [[package]] name = "datafusion-physical-expr-common" version = "49.0.1" diff --git a/Cargo.toml b/Cargo.toml index 1de0959d6a..31db1acd38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "datafusion/functions-window-common", "datafusion/optimizer", "datafusion/physical-expr", + "datafusion/physical-expr-adapter", "datafusion/physical-expr-common", "datafusion/physical-optimizer", "datafusion/pruning", @@ -134,6 +135,7 @@ datafusion-functions-window-common = { path = "datafusion/functions-window-commo datafusion-macros = { path = "datafusion/macros", version = "49.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "49.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "49.0.0", default-features = false } +datafusion-physical-expr-adapter = { path = "datafusion/physical-expr-adapter", version = "49.0.0", default-features = false } datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "49.0.0", default-features = false } datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "49.0.0" } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "49.0.0" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 409fc12bcb..f12bd9202e 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -68,6 +68,7 @@ dashmap = { workspace = true } base64 = "0.22.1" datafusion = { workspace = true, default-features = true } datafusion-ffi = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } datafusion-proto = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } diff --git a/datafusion-examples/examples/custom_file_casts.rs b/datafusion-examples/examples/custom_file_casts.rs index 847aa8ad7f..a787c07c2b 100644 --- a/datafusion-examples/examples/custom_file_casts.rs +++ b/datafusion-examples/examples/custom_file_casts.rs @@ -31,11 +31,11 @@ use datafusion::execution::context::SessionContext; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::ArrowWriter; use datafusion::physical_expr::expressions::CastExpr; -use datafusion::physical_expr::schema_rewriter::{ - DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, -}; use datafusion::physical_expr::PhysicalExpr; use datafusion::prelude::SessionConfig; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; use object_store::memory::InMemory; use object_store::path::Path; use object_store::{ObjectStore, PutPayload}; diff --git a/datafusion-examples/examples/default_column_values.rs b/datafusion-examples/examples/default_column_values.rs index b504ef3aad..43e2d4ca09 100644 --- a/datafusion-examples/examples/default_column_values.rs +++ b/datafusion-examples/examples/default_column_values.rs @@ -38,12 +38,12 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; use datafusion::parquet::arrow::ArrowWriter; use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_expr::expressions::{CastExpr, Column, Literal}; -use datafusion::physical_expr::schema_rewriter::{ - DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, -}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{lit, SessionConfig}; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; use futures::StreamExt; use object_store::memory::InMemory; use object_store::path::Path; diff --git a/datafusion-examples/examples/json_shredding.rs b/datafusion-examples/examples/json_shredding.rs index 261bf47915..b7acb5c7b7 100644 --- a/datafusion-examples/examples/json_shredding.rs +++ b/datafusion-examples/examples/json_shredding.rs @@ -40,14 +40,14 @@ use datafusion::logical_expr::{ }; use datafusion::parquet::arrow::ArrowWriter; use datafusion::parquet::file::properties::WriterProperties; -use datafusion::physical_expr::schema_rewriter::{ - DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, -}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_expr::{expressions, ScalarFunctionExpr}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{lit, SessionConfig}; use datafusion::scalar::ScalarValue; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; use futures::StreamExt; use object_store::memory::InMemory; use object_store::path::Path; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 53a17c1b93..f1f220f105 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -129,6 +129,7 @@ datafusion-functions-table = { workspace = true } datafusion-functions-window = { workspace = true } datafusion-optimizer = { workspace = true } datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 121ab46730..5741060bf2 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -48,7 +48,7 @@ use datafusion_execution::{ use datafusion_expr::{ dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType, }; -use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; use futures::{future, stream, Stream, StreamExt, TryStreamExt}; diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index f9a46f2e24..f685ccdc9f 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -34,10 +34,11 @@ use datafusion_datasource::schema_adapter::{ use datafusion_datasource::ListingTableUrl; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::expressions::{self, Column}; -use datafusion_physical_expr::schema_rewriter::{ - DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, + PhysicalExprAdapterFactory, }; -use datafusion_physical_expr::{DefaultPhysicalExprAdapter, PhysicalExpr}; use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index 6bccd76b60..ae67f91184 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -42,6 +42,7 @@ datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions-aggregate = { workspace = true } datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index af4a9075a6..709fcc5c1f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -35,8 +35,8 @@ use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_datasource::PartitionedFile; -use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ is_dynamic_physical_expr, PhysicalExpr, }; @@ -585,9 +585,9 @@ mod test { }; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ - expressions::DynamicFilterPhysicalExpr, planner::logical2physical, - schema_rewriter::DefaultPhysicalExprAdapterFactory, PhysicalExpr, + expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr, }; + use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use futures::{Stream, StreamExt}; use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 366d42700f..caec7db0ce 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -41,7 +41,7 @@ use datafusion_common::{DataFusionError, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; -use datafusion_physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory; +use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::PushedDown; diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index f43894de37..6b2c6cbd40 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -53,6 +53,7 @@ datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 4d03c46cf5..0922607dbb 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -53,8 +53,8 @@ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; diff --git a/datafusion/physical-expr-adapter/Cargo.toml b/datafusion/physical-expr-adapter/Cargo.toml new file mode 100644 index 0000000000..c076024b45 --- /dev/null +++ b/datafusion/physical-expr-adapter/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "datafusion-physical-expr-adapter" +description = "Physical expression schema adaptation utilities for DataFusion" +keywords = ["datafusion", "query", "sql"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lib] +name = "datafusion_physical_expr_adapter" +path = "src/lib.rs" + +[dependencies] +arrow = { workspace = true } +datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-functions = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } +itertools = { workspace = true } + +[dev-dependencies] +insta = { workspace = true } +rstest = { workspace = true } diff --git a/datafusion/physical-expr-adapter/README.md b/datafusion/physical-expr-adapter/README.md new file mode 100644 index 0000000000..beecd53875 --- /dev/null +++ b/datafusion/physical-expr-adapter/README.md @@ -0,0 +1,8 @@ +# DataFusion Physical Expression Adapter + +This crate provides utilities for adapting physical expressions to different schemas in DataFusion. + +It handles schema differences in file scans by rewriting expressions to match the physical schema, +including type casting, missing columns, and partition values. + +For detailed documentation, see the [`PhysicalExprAdapter`] trait documentation. diff --git a/datafusion/physical-expr-adapter/src/lib.rs b/datafusion/physical-expr-adapter/src/lib.rs new file mode 100644 index 0000000000..025f1b4b63 --- /dev/null +++ b/datafusion/physical-expr-adapter/src/lib.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg", + html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg" +)] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +//! Physical expression schema adaptation utilities for DataFusion + +pub mod schema_rewriter; + +pub use schema_rewriter::{ + DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, + PhysicalExprAdapterFactory, +}; diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs similarity index 68% rename from datafusion/physical-expr/src/schema_rewriter.rs rename to datafusion/physical-expr-adapter/src/schema_rewriter.rs index d622ce4bc0..3bdff1bdfb 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -20,27 +20,50 @@ use std::sync::Arc; use arrow::compute::can_cast_types; -use arrow::datatypes::{FieldRef, Schema, SchemaRef}; +use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef}; use datafusion_common::{ exec_err, tree_node::{Transformed, TransformedResult, TreeNode}, Result, ScalarValue, }; +use datafusion_functions::core::getfield::GetFieldFunc; +use datafusion_physical_expr::{ + expressions::{self, CastExpr, Column}, + ScalarFunctionExpr, +}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use crate::expressions::{self, CastExpr, Column}; - /// Trait for adapting physical expressions to match a target schema. /// /// This is used in file scans to rewrite expressions so that they can be evaluated /// against the physical schema of the file being scanned. It allows for handling /// differences between logical and physical schemas, such as type mismatches or missing columns. /// -/// You can create a custom implemention of this trait to handle specific rewriting logic. +/// ## Overview +/// +/// The `PhysicalExprAdapter` allows rewriting physical expressions to match different schemas, including: +/// +/// - **Type casting**: When logical and physical schemas have different types, expressions are +/// automatically wrapped with cast operations. For example, `lit(ScalarValue::Int32(123)) = int64_column` +/// gets rewritten to `lit(ScalarValue::Int32(123)) = cast(int64_column, 'Int32')`. +/// Note that this does not attempt to simplify such expressions - that is done by shared simplifiers. +/// +/// - **Missing columns**: When a column exists in the logical schema but not in the physical schema, +/// references to it are replaced with null literals. +/// +/// - **Struct field access**: Expressions like `struct_column.field_that_is_missing_in_schema` are +/// rewritten to `null` when the field doesn't exist in the physical schema. +/// +/// - **Partition columns**: Partition column references can be replaced with their literal values +/// when scanning specific partitions. +/// +/// ## Custom Implementations +/// +/// You can create a custom implementation of this trait to handle specific rewriting logic. /// For example, to fill in missing columns with default values instead of nulls: /// /// ```rust -/// use datafusion_physical_expr::schema_rewriter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; +/// use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; /// use arrow::datatypes::{Schema, Field, DataType, FieldRef, SchemaRef}; /// use datafusion_physical_expr_common::physical_expr::PhysicalExpr; /// use datafusion_common::{Result, ScalarValue, tree_node::{Transformed, TransformedResult, TreeNode}}; @@ -151,7 +174,7 @@ impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory { /// # Example /// /// ```rust -/// use datafusion_physical_expr::schema_rewriter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory}; +/// use datafusion_physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory}; /// use arrow::datatypes::Schema; /// use std::sync::Arc; /// @@ -220,6 +243,10 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { &self, expr: Arc<dyn PhysicalExpr>, ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> { + if let Some(transformed) = self.try_rewrite_struct_field_access(&expr)? { + return Ok(Transformed::yes(transformed)); + } + if let Some(column) = expr.as_any().downcast_ref::<Column>() { return self.rewrite_column(Arc::clone(&expr), column); } @@ -227,6 +254,88 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { Ok(Transformed::no(expr)) } + /// Attempt to rewrite struct field access expressions to return null if the field does not exist in the physical schema. + /// Note that this does *not* handle nested struct fields, only top-level struct field access. + /// See <https://github.com/apache/datafusion/issues/17114> for more details. + fn try_rewrite_struct_field_access( + &self, + expr: &Arc<dyn PhysicalExpr>, + ) -> Result<Option<Arc<dyn PhysicalExpr>>> { + let get_field_expr = + match ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(expr.as_ref()) { + Some(expr) => expr, + None => return Ok(None), + }; + + let source_expr = match get_field_expr.args().first() { + Some(expr) => expr, + None => return Ok(None), + }; + + let field_name_expr = match get_field_expr.args().get(1) { + Some(expr) => expr, + None => return Ok(None), + }; + + let lit = match field_name_expr + .as_any() + .downcast_ref::<expressions::Literal>() + { + Some(lit) => lit, + None => return Ok(None), + }; + + let field_name = match lit.value().try_as_str().flatten() { + Some(name) => name, + None => return Ok(None), + }; + + let column = match source_expr.as_any().downcast_ref::<Column>() { + Some(column) => column, + None => return Ok(None), + }; + + let physical_field = + match self.physical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(_) => return Ok(None), + }; + + let physical_struct_fields = match physical_field.data_type() { + DataType::Struct(fields) => fields, + _ => return Ok(None), + }; + + if physical_struct_fields + .iter() + .any(|f| f.name() == field_name) + { + return Ok(None); + } + + let logical_field = match self.logical_file_schema.field_with_name(column.name()) + { + Ok(field) => field, + Err(_) => return Ok(None), + }; + + let logical_struct_fields = match logical_field.data_type() { + DataType::Struct(fields) => fields, + _ => return Ok(None), + }; + + let logical_struct_field = match logical_struct_fields + .iter() + .find(|f| f.name() == field_name) + { + Some(field) => field, + None => return Ok(None), + }; + + let null_value = ScalarValue::Null.cast_to(logical_struct_field.data_type())?; + Ok(Some(expressions::lit(null_value))) + } + fn rewrite_column( &self, expr: Arc<dyn PhysicalExpr>, @@ -304,7 +413,9 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { // TODO: add optimization to move the cast from the column to literal expressions in the case of `col = 123` // since that's much cheaper to evalaute. // See https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928 - if !can_cast_types(physical_field.data_type(), logical_field.data_type()) { + let is_compatible = + can_cast_types(physical_field.data_type(), logical_field.data_type()); + if !is_compatible { return exec_err!( "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", column.name(), @@ -332,15 +443,13 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { #[cfg(test)] mod tests { - use crate::expressions::{col, lit}; - use super::*; - use arrow::{ - array::{RecordBatch, RecordBatchOptions}, - datatypes::{DataType, Field, Schema, SchemaRef}, - }; - use datafusion_common::{record_batch, ScalarValue}; + use arrow::array::{RecordBatch, RecordBatchOptions}; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion_common::{assert_contains, record_batch, Result, ScalarValue}; use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{col, lit, CastExpr, Column, Literal}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use itertools::Itertools; use std::sync::Arc; @@ -413,7 +522,7 @@ mod tests { Arc::new(expected), Operator::Or, Arc::new(expressions::BinaryExpr::new( - lit(ScalarValue::Null), + lit(ScalarValue::Float64(None)), // c is missing, so it becomes null Operator::Gt, Arc::new(expressions::Literal::new(ScalarValue::Float64(Some(0.0)))), )), @@ -426,6 +535,75 @@ mod tests { ); } + #[test] + fn test_rewrite_struct_column_incompatible() { + let physical_schema = Schema::new(vec![Field::new( + "data", + DataType::Struct(vec![Field::new("field1", DataType::Binary, true)].into()), + true, + )]); + + let logical_schema = Schema::new(vec![Field::new( + "data", + DataType::Struct(vec![Field::new("field1", DataType::Int32, true)].into()), + true, + )]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); + let column_expr = Arc::new(Column::new("data", 0)); + + let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string(); + assert_contains!(error_msg, "Cannot cast column 'data'"); + } + + #[test] + fn test_rewrite_struct_compatible_cast() { + let physical_schema = Schema::new(vec![Field::new( + "data", + DataType::Struct( + vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ] + .into(), + ), + false, + )]); + + let logical_schema = Schema::new(vec![Field::new( + "data", + DataType::Struct( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8View, true), + ] + .into(), + ), + false, + )]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); + let column_expr = Arc::new(Column::new("data", 0)); + + let result = adapter.rewrite(column_expr).unwrap(); + + let expected = Arc::new(CastExpr::new( + Arc::new(Column::new("data", 0)), + DataType::Struct( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8View, true), + ] + .into(), + ), + None, + )) as Arc<dyn PhysicalExpr>; + + assert_eq!(result.to_string(), expected.to_string()); + } + #[test] fn test_rewrite_missing_column() -> Result<()> { let (physical_schema, logical_schema) = create_test_schema(); @@ -446,6 +624,42 @@ mod tests { Ok(()) } + #[test] + fn test_rewrite_missing_column_non_nullable_error() { + let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let logical_schema = Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, false), // Missing and non-nullable + ]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); + let column_expr = Arc::new(Column::new("b", 1)); + + let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string(); + assert_contains!(error_msg, "Non-nullable column 'b' is missing"); + } + + #[test] + fn test_rewrite_missing_column_nullable() { + let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let logical_schema = Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, true), // Missing but nullable + ]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); + let column_expr = Arc::new(Column::new("b", 1)); + + let result = adapter.rewrite(column_expr).unwrap(); + + let expected = + Arc::new(Literal::new(ScalarValue::Utf8(None))) as Arc<dyn PhysicalExpr>; + + assert_eq!(result.to_string(), expected.to_string()); + } + #[test] fn test_rewrite_partition_column() -> Result<()> { let (physical_schema, logical_schema) = create_test_schema(); @@ -509,13 +723,13 @@ mod tests { let result = adapter.rewrite(column_expr); assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Non-nullable column 'b' is missing")); + assert_contains!( + result.unwrap_err().to_string(), + "Non-nullable column 'b' is missing from the physical schema" + ); } - /// Roughly stolen from ProjectionExec + /// Helper function to project expressions onto a RecordBatch fn batch_project( expr: Vec<Arc<dyn PhysicalExpr>>, batch: &RecordBatch, @@ -606,4 +820,43 @@ mod tests { vec![Some(1), None, Some(3)] ); } + + #[test] + fn test_try_rewrite_struct_field_access() { + // Test the core logic of try_rewrite_struct_field_access + let physical_schema = Schema::new(vec![Field::new( + "struct_col", + DataType::Struct( + vec![Field::new("existing_field", DataType::Int32, true)].into(), + ), + true, + )]); + + let logical_schema = Schema::new(vec![Field::new( + "struct_col", + DataType::Struct( + vec![ + Field::new("existing_field", DataType::Int32, true), + Field::new("missing_field", DataType::Utf8, true), + ] + .into(), + ), + true, + )]); + + let rewriter = DefaultPhysicalExprAdapterRewriter { + logical_file_schema: &logical_schema, + physical_file_schema: &physical_schema, + partition_fields: &[], + }; + + // Test that when a field exists in physical schema, it returns None + let column = Arc::new(Column::new("struct_col", 0)) as Arc<dyn PhysicalExpr>; + let result = rewriter.try_rewrite_struct_field_access(&column).unwrap(); + assert!(result.is_none()); + + // The actual test for the get_field expression would require creating a proper ScalarFunctionExpr + // with ScalarUDF, which is complex to set up in a unit test. The integration tests in + // datafusion/core/tests/parquet/schema_adapter.rs provide better coverage for this functionality. + } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 845c358d7e..46f7b30d01 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -38,7 +38,6 @@ mod partitioning; mod physical_expr; pub mod planner; mod scalar_function; -pub mod schema_rewriter; pub mod simplifier; pub mod statistics; pub mod utils; @@ -70,7 +69,7 @@ pub use datafusion_physical_expr_common::sort_expr::{ pub use planner::{create_physical_expr, create_physical_exprs}; pub use scalar_function::ScalarFunctionExpr; -pub use schema_rewriter::DefaultPhysicalExprAdapter; +pub use simplifier::PhysicalExprSimplifier; pub use utils::{conjunction, conjunction_opt, split_conjunction}; // For backwards compatibility diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index e185cca752..7a3c463f25 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -165,6 +165,29 @@ impl ScalarFunctionExpr { pub fn config_options(&self) -> &ConfigOptions { &self.config_options } + + /// Given an arbitrary PhysicalExpr attempt to downcast it to a ScalarFunctionExpr + /// and verify that its inner function is of type T. + /// If the downcast fails, or the function is not of type T, returns `None`. + /// Otherwise returns `Some(ScalarFunctionExpr)`. + pub fn try_downcast_func<T>(expr: &dyn PhysicalExpr) -> Option<&ScalarFunctionExpr> + where + T: 'static, + { + match expr.as_any().downcast_ref::<ScalarFunctionExpr>() { + Some(scalar_expr) + if scalar_expr + .fun() + .inner() + .as_any() + .downcast_ref::<T>() + .is_some() => + { + Some(scalar_expr) + } + _ => None, + } + } } impl fmt::Display for ScalarFunctionExpr { diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 57ce6da117..5c352c1a6e 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -154,6 +154,24 @@ impl AsyncScalarUDFImpl for AskLLM { # */ ``` +### Schema Rewriter Module Moved to New Crate + +The `schema_rewriter` module and its associated symbols have been moved from `datafusion_physical_expr` to a new crate `datafusion_physical_expr_adapter`. This affects the following symbols: + +- `DefaultPhysicalExprAdapter` +- `DefaultPhysicalExprAdapterFactory` +- `PhysicalExprAdapter` +- `PhysicalExprAdapterFactory` + +To upgrade, change your imports to: + +```rust +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, + PhysicalExprAdapter, PhysicalExprAdapterFactory +}; +``` + ### Upgrade to arrow `56.0.0` and parquet `56.0.0` This version of DataFusion upgrades the underlying Apache Arrow implementation --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org